Prune sigverify queue (#20315)
This commit is contained in:
@ -18,6 +18,44 @@ use std::sync::mpsc::channel;
|
|||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use test::Bencher;
|
use test::Bencher;
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
fn bench_packet_discard(bencher: &mut Bencher) {
|
||||||
|
solana_logger::setup();
|
||||||
|
let len = 30 * 1000;
|
||||||
|
let chunk_size = 1024;
|
||||||
|
let tx = test_tx();
|
||||||
|
let mut batches = to_packets_chunked(&vec![tx; len], chunk_size);
|
||||||
|
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
let ips: Vec<_> = (0..10_000)
|
||||||
|
.into_iter()
|
||||||
|
.map(|_| {
|
||||||
|
let mut addr = [0u16; 8];
|
||||||
|
thread_rng().fill(&mut addr);
|
||||||
|
addr
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for batch in batches.iter_mut() {
|
||||||
|
total += batch.packets.len();
|
||||||
|
for p in batch.packets.iter_mut() {
|
||||||
|
let ip_index = thread_rng().gen_range(0, ips.len());
|
||||||
|
p.meta.addr = ips[ip_index];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("total packets: {}", total);
|
||||||
|
|
||||||
|
bencher.iter(move || {
|
||||||
|
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
|
||||||
|
for batch in batches.iter_mut() {
|
||||||
|
for p in batch.packets.iter_mut() {
|
||||||
|
p.meta.discard = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
fn bench_sigverify_stage(bencher: &mut Bencher) {
|
fn bench_sigverify_stage(bencher: &mut Bencher) {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
@ -10,16 +10,14 @@ use crossbeam_channel::{SendError, Sender as CrossbeamSender};
|
|||||||
use solana_measure::measure::Measure;
|
use solana_measure::measure::Measure;
|
||||||
use solana_metrics::datapoint_debug;
|
use solana_metrics::datapoint_debug;
|
||||||
use solana_perf::packet::Packets;
|
use solana_perf::packet::Packets;
|
||||||
use solana_perf::perf_libs;
|
|
||||||
use solana_sdk::timing;
|
use solana_sdk::timing;
|
||||||
use solana_streamer::streamer::{self, PacketReceiver, StreamerError};
|
use solana_streamer::streamer::{self, PacketReceiver, StreamerError};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError};
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
const RECV_BATCH_MAX_CPU: usize = 1_000;
|
const MAX_SIGVERIFY_BATCH: usize = 10_000;
|
||||||
const RECV_BATCH_MAX_GPU: usize = 5_000;
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum SigVerifyServiceError {
|
pub enum SigVerifyServiceError {
|
||||||
@ -33,7 +31,7 @@ pub enum SigVerifyServiceError {
|
|||||||
type Result<T> = std::result::Result<T, SigVerifyServiceError>;
|
type Result<T> = std::result::Result<T, SigVerifyServiceError>;
|
||||||
|
|
||||||
pub struct SigVerifyStage {
|
pub struct SigVerifyStage {
|
||||||
thread_hdls: Vec<JoinHandle<()>>,
|
thread_hdl: JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait SigVerifier {
|
pub trait SigVerifier {
|
||||||
@ -57,49 +55,67 @@ impl SigVerifyStage {
|
|||||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||||
verifier: T,
|
verifier: T,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let thread_hdls = Self::verifier_services(packet_receiver, verified_sender, verifier);
|
let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier);
|
||||||
Self { thread_hdls }
|
Self { thread_hdl }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn discard_excess_packets(batches: &mut Vec<Packets>, max_packets: usize) {
|
||||||
|
let mut received_ips = HashMap::new();
|
||||||
|
for (batch_index, batch) in batches.iter().enumerate() {
|
||||||
|
for (packet_index, packets) in batch.packets.iter().enumerate() {
|
||||||
|
let e = received_ips
|
||||||
|
.entry(packets.meta.addr().ip())
|
||||||
|
.or_insert_with(Vec::new);
|
||||||
|
e.push((batch_index, packet_index));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut batch_len = 0;
|
||||||
|
while batch_len < max_packets {
|
||||||
|
for (_ip, indexes) in received_ips.iter_mut() {
|
||||||
|
if !indexes.is_empty() {
|
||||||
|
indexes.remove(0);
|
||||||
|
batch_len += 1;
|
||||||
|
if batch_len >= MAX_SIGVERIFY_BATCH {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (_addr, indexes) in received_ips {
|
||||||
|
for (batch_index, packet_index) in indexes {
|
||||||
|
batches[batch_index].packets[packet_index].meta.discard = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verifier<T: SigVerifier>(
|
fn verifier<T: SigVerifier>(
|
||||||
recvr: &Arc<Mutex<PacketReceiver>>,
|
recvr: &PacketReceiver,
|
||||||
sendr: &CrossbeamSender<Vec<Packets>>,
|
sendr: &CrossbeamSender<Vec<Packets>>,
|
||||||
id: usize,
|
|
||||||
verifier: &T,
|
verifier: &T,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (batch, len, recv_time) = streamer::recv_batch(
|
let (mut batches, len, recv_time) = streamer::recv_batch(recvr)?;
|
||||||
&recvr.lock().expect("'recvr' lock in fn verifier"),
|
|
||||||
if perf_libs::api().is_some() {
|
|
||||||
RECV_BATCH_MAX_GPU
|
|
||||||
} else {
|
|
||||||
RECV_BATCH_MAX_CPU
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let mut verify_batch_time = Measure::start("sigverify_batch_time");
|
let mut verify_batch_time = Measure::start("sigverify_batch_time");
|
||||||
let batch_len = batch.len();
|
let batches_len = batches.len();
|
||||||
debug!(
|
debug!("@{:?} verifier: verifying: {}", timing::timestamp(), len,);
|
||||||
"@{:?} verifier: verifying: {} id: {}",
|
if len > MAX_SIGVERIFY_BATCH {
|
||||||
timing::timestamp(),
|
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
|
||||||
len,
|
}
|
||||||
id
|
sendr.send(verifier.verify_batch(batches))?;
|
||||||
);
|
|
||||||
sendr.send(verifier.verify_batch(batch))?;
|
|
||||||
verify_batch_time.stop();
|
verify_batch_time.stop();
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
|
"@{:?} verifier: done. batches: {} total verify time: {:?} verified: {} v/s {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
batch_len,
|
batches_len,
|
||||||
verify_batch_time.as_ms(),
|
verify_batch_time.as_ms(),
|
||||||
id,
|
|
||||||
len,
|
len,
|
||||||
(len as f32 / verify_batch_time.as_s())
|
(len as f32 / verify_batch_time.as_s())
|
||||||
);
|
);
|
||||||
|
|
||||||
datapoint_debug!(
|
datapoint_debug!(
|
||||||
"sigverify_stage-total_verify_time",
|
"sigverify_stage-total_verify_time",
|
||||||
("num_batches", batch_len, i64),
|
("num_batches", batches_len, i64),
|
||||||
("num_packets", len, i64),
|
("num_packets", len, i64),
|
||||||
("verify_time_ms", verify_batch_time.as_ms(), i64),
|
("verify_time_ms", verify_batch_time.as_ms(), i64),
|
||||||
("recv_time", recv_time, i64),
|
("recv_time", recv_time, i64),
|
||||||
@ -109,16 +125,15 @@ impl SigVerifyStage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
|
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
|
||||||
packet_receiver: Arc<Mutex<PacketReceiver>>,
|
packet_receiver: PacketReceiver,
|
||||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||||
id: usize,
|
|
||||||
verifier: &T,
|
verifier: &T,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let verifier = verifier.clone();
|
let verifier = verifier.clone();
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("solana-verifier-{}", id))
|
.name("solana-verifier".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, id, &verifier) {
|
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, &verifier) {
|
||||||
match e {
|
match e {
|
||||||
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
|
||||||
RecvTimeoutError::Disconnected,
|
RecvTimeoutError::Disconnected,
|
||||||
@ -140,19 +155,43 @@ impl SigVerifyStage {
|
|||||||
packet_receiver: PacketReceiver,
|
packet_receiver: PacketReceiver,
|
||||||
verified_sender: CrossbeamSender<Vec<Packets>>,
|
verified_sender: CrossbeamSender<Vec<Packets>>,
|
||||||
verifier: T,
|
verifier: T,
|
||||||
) -> Vec<JoinHandle<()>> {
|
) -> JoinHandle<()> {
|
||||||
let receiver = Arc::new(Mutex::new(packet_receiver));
|
Self::verifier_service(packet_receiver, verified_sender, &verifier)
|
||||||
(0..4)
|
|
||||||
.map(|id| {
|
|
||||||
Self::verifier_service(receiver.clone(), verified_sender.clone(), id, &verifier)
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn join(self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
for thread_hdl in self.thread_hdls {
|
self.thread_hdl.join()
|
||||||
thread_hdl.join()?;
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use solana_perf::packet::Packet;
|
||||||
|
|
||||||
|
fn count_non_discard(packets: &[Packets]) -> usize {
|
||||||
|
packets
|
||||||
|
.iter()
|
||||||
|
.map(|pp| {
|
||||||
|
pp.packets
|
||||||
|
.iter()
|
||||||
|
.map(|p| if p.meta.discard { 0 } else { 1 })
|
||||||
|
.sum::<usize>()
|
||||||
|
})
|
||||||
|
.sum::<usize>()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_packet_discard() {
|
||||||
|
solana_logger::setup();
|
||||||
|
let mut p = Packets::default();
|
||||||
|
p.packets.resize(10, Packet::default());
|
||||||
|
p.packets[3].meta.addr = [1u16; 8];
|
||||||
|
let mut packets = vec![p];
|
||||||
|
let max = 3;
|
||||||
|
SigVerifyStage::discard_excess_packets(&mut packets, max);
|
||||||
|
assert_eq!(count_non_discard(&packets), max);
|
||||||
|
assert!(!packets[0].packets[0].meta.discard);
|
||||||
|
assert!(!packets[0].packets[3].meta.discard);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,7 +126,7 @@ fn recv_send(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packets>, usize, u64)> {
|
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, u64)> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = recvr.recv_timeout(timer)?;
|
let msgs = recvr.recv_timeout(timer)?;
|
||||||
let recv_start = Instant::now();
|
let recv_start = Instant::now();
|
||||||
@ -137,9 +137,6 @@ pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packe
|
|||||||
trace!("got more msgs");
|
trace!("got more msgs");
|
||||||
len += more.packets.len();
|
len += more.packets.len();
|
||||||
batch.push(more);
|
batch.push(more);
|
||||||
if len > max_batch {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
trace!("batch len {}", batch.len());
|
trace!("batch len {}", batch.len());
|
||||||
Ok((batch, len, duration_as_ms(&recv_start.elapsed())))
|
Ok((batch, len, duration_as_ms(&recv_start.elapsed())))
|
||||||
|
Reference in New Issue
Block a user