shrink batches when over 80% of the space is wasted (#23066)

* shrink batches when over 80% of the space is wasted
This commit is contained in:
anatoly yakovenko
2022-02-16 08:18:17 -08:00
committed by GitHub
parent 115d71536b
commit 83d31c9e65
4 changed files with 457 additions and 22 deletions

View File

@ -9,7 +9,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_core::{sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage},
solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_perf::{packet::to_packet_batches, packet::PacketBatch, test_tx::test_tx},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
@ -109,19 +109,10 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
});
}
#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);
let now = Instant::now();
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let use_same_tx = true;
let chunk_size = 1024;
let mut batches = if use_same_tx {
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
} else {
@ -139,14 +130,28 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
})
.collect();
to_packet_batches(&txs, chunk_size)
};
}
}
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);
#[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup();
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);
let use_same_tx = true;
bencher.iter(move || {
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);
let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
@ -162,7 +167,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
received += v.packets.len();
batches.push(v);
}
if received >= sent_len {
if use_same_tx || received >= sent_len {
break;
}
}

View File

@ -12,7 +12,7 @@ use {
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_perf::sigverify::Deduper,
solana_perf::sigverify::{count_valid_packets, shrink_batches, Deduper},
solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{
@ -58,6 +58,9 @@ struct SigVerifierStats {
total_packets: usize,
total_dedup: usize,
total_excess_fail: usize,
total_shrink_time: usize,
total_shrinks: usize,
total_valid_packets: usize,
}
impl SigVerifierStats {
@ -166,6 +169,9 @@ impl SigVerifierStats {
("total_packets", self.total_packets, i64),
("total_dedup", self.total_dedup, i64),
("total_excess_fail", self.total_excess_fail, i64),
("total_shrink_time", self.total_shrink_time, i64),
("total_shrinks", self.total_shrinks, i64),
("total_valid_packets", self.total_valid_packets, i64),
);
}
}
@ -248,7 +254,20 @@ impl SigVerifyStage {
discard_time.stop();
let mut verify_batch_time = Measure::start("sigverify_batch_time");
let batches = verifier.verify_batches(batches, num_valid_packets);
let mut batches = verifier.verify_batches(batches, num_valid_packets);
verify_batch_time.stop();
let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_valid_packets = count_valid_packets(&batches);
let start_len = batches.len();
const MAX_EMPTY_BATCH_RATIO: usize = 4;
if num_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) {
let valid = shrink_batches(&mut batches);
batches.truncate(valid);
}
let total_shrinks = start_len.saturating_sub(batches.len());
shrink_time.stop();
sendr.send(batches)?;
verify_batch_time.stop();
@ -282,7 +301,10 @@ impl SigVerifyStage {
stats.total_batches += batches_len;
stats.total_packets += num_packets;
stats.total_dedup += dedup_fail;
stats.total_valid_packets += num_valid_packets;
stats.total_excess_fail += excess_fail;
stats.total_shrink_time += shrink_time.as_us() as usize;
stats.total_shrinks += total_shrinks;
Ok(())
}
@ -348,6 +370,11 @@ impl SigVerifyStage {
#[cfg(test)]
mod tests {
use crate::sigverify::TransactionSigVerifier;
use crate::sigverify_stage::timing::duration_as_ms;
use crossbeam_channel::unbounded;
use solana_perf::packet::to_packet_batches;
use solana_perf::test_tx::test_tx;
use {super::*, solana_perf::packet::Packet};
fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
@ -379,4 +406,58 @@ mod tests {
assert!(batches[0].packets[3].meta.discard());
assert!(!batches[0].packets[4].meta.discard());
}
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let chunk_size = 1024;
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
} else {
let txs: Vec<_> = (0..len).map(|_| test_tx()).collect();
to_packet_batches(&txs, chunk_size)
}
}
#[test]
fn test_sigverify_stage() {
solana_logger::setup();
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier);
let use_same_tx = true;
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
batches.len()
);
let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
sent_len += batch.packets.len();
packet_s.send(batch).unwrap();
}
}
let mut received = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
while let Some(v) = verifieds.pop() {
received += v.packets.len();
batches.push(v);
}
if use_same_tx || received >= sent_len {
break;
}
}
}
trace!("received: {}", received);
drop(packet_s);
stage.join().unwrap();
}
}