shrink batches when over 80% of the space is wasted (backport #23066) (#23189)

* shrink batches when over 80% of the space is wasted (#23066)

* shrink batches when over 80% of the space is wasted

(cherry picked from commit 83d31c9e65)

# Conflicts:
#	core/benches/sigverify_stage.rs
#	core/src/sigverify_stage.rs
#	perf/src/sigverify.rs

* fixup!

Co-authored-by: anatoly yakovenko <anatoly@solana.com>
This commit is contained in:
mergify[bot]
2022-02-16 19:30:16 +00:00
committed by GitHub
parent 41bbc11a46
commit 3fd78ac6ea
4 changed files with 458 additions and 22 deletions

View File

@ -12,7 +12,7 @@ use {
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::sigverify::Deduper,
solana_perf::sigverify::{count_valid_packets, shrink_batches, Deduper},
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
@ -59,6 +59,9 @@ struct SigVerifierStats {
total_packets: usize,
total_dedup: usize,
total_excess_fail: usize,
total_shrink_time: usize,
total_shrinks: usize,
total_valid_packets: usize,
}
impl SigVerifierStats {
@ -167,6 +170,9 @@ impl SigVerifierStats {
("total_packets", self.total_packets, i64),
("total_dedup", self.total_dedup, i64),
("total_excess_fail", self.total_excess_fail, i64),
("total_shrink_time", self.total_shrink_time, i64),
("total_shrinks", self.total_shrinks, i64),
("total_valid_packets", self.total_valid_packets, i64),
);
}
}
@ -242,7 +248,20 @@ impl SigVerifyStage {
discard_time.stop();
let mut verify_batch_time = Measure::start("sigverify_batch_time");
let batches = verifier.verify_batches(batches);
let mut batches = verifier.verify_batches(batches);
verify_batch_time.stop();
let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_valid_packets = count_valid_packets(&batches);
let start_len = batches.len();
const MAX_EMPTY_BATCH_RATIO: usize = 4;
if num_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) {
let valid = shrink_batches(&mut batches);
batches.truncate(valid);
}
let total_shrinks = start_len.saturating_sub(batches.len());
shrink_time.stop();
sendr.send(batches)?;
verify_batch_time.stop();
@ -276,7 +295,10 @@ impl SigVerifyStage {
stats.total_batches += batches_len;
stats.total_packets += num_packets;
stats.total_dedup += dedup_fail;
stats.total_valid_packets += num_valid_packets;
stats.total_excess_fail += excess_fail;
stats.total_shrink_time += shrink_time.as_us() as usize;
stats.total_shrinks += total_shrinks;
Ok(())
}
@ -342,6 +364,12 @@ impl SigVerifyStage {
#[cfg(test)]
mod tests {
use crate::sigverify::TransactionSigVerifier;
use crate::sigverify_stage::timing::duration_as_ms;
use crossbeam_channel::unbounded;
use solana_perf::packet::to_packet_batches;
use solana_perf::test_tx::test_tx;
use std::sync::mpsc::channel;
use {super::*, solana_perf::packet::Packet};
fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
@ -370,4 +398,58 @@ mod tests {
assert!(!batches[0].packets[0].meta.discard());
assert!(!batches[0].packets[3].meta.discard());
}
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let chunk_size = 1024;
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
} else {
let txs: Vec<_> = (0..len).map(|_| test_tx()).collect();
to_packet_batches(&txs, chunk_size)
}
}
#[test]
fn test_sigverify_stage() {
solana_logger::setup();
trace!("start");
let (packet_s, packet_r) = channel();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);
let use_same_tx = true;
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);
let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
sent_len += batch.packets.len();
packet_s.send(batch).unwrap();
}
}
let mut received = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
while let Some(v) = verifieds.pop() {
received += v.packets.len();
batches.push(v);
}
if use_same_tx || received >= sent_len {
break;
}
}
}
trace!("received: {}", received);
drop(packet_s);
stage.join().unwrap();
}
}