improves sigverify discard_excess_packets performance (backport #22577) (#22580)

* improves sigverify discard_excess_packets performance (#22577)

As shown by the added benchmark, current code does worse if there is a
spam address plus a lot of unique addresses.

on current master:
test bench_packet_discard_many_senders  ... bench:   1,997,960 ns/iter (+/- 103,715)
test bench_packet_discard_mixed_senders ... bench:  14,256,116 ns/iter (+/- 534,865)
test bench_packet_discard_single_sender ... bench:   1,306,809 ns/iter (+/- 61,992)

with this commit:
test bench_packet_discard_many_senders  ... bench:   1,644,025 ns/iter (+/- 83,715)
test bench_packet_discard_mixed_senders ... bench:   1,089,789 ns/iter (+/- 86,324)
test bench_packet_discard_single_sender ... bench:     955,234 ns/iter (+/- 55,953)

(cherry picked from commit dcf44d2523)

# Conflicts:
#	core/src/sigverify_stage.rs

* removes mergify merge conflicts

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
This commit is contained in:
mergify[bot]
2022-01-19 20:14:10 +00:00
committed by GitHub
parent c9df037dae
commit f77ea5f324
2 changed files with 63 additions and 28 deletions

View File

@ -52,11 +52,16 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
bencher.iter(move || { bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000); SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() { for batch in batches.iter_mut() {
for p in batch.packets.iter_mut() { for p in batch.packets.iter_mut() {
if !p.meta.discard() {
num_packets += 1;
}
p.meta.set_discard(false); p.meta.set_discard(false);
} }
} }
assert_eq!(num_packets, 10_000);
}); });
} }
@ -70,6 +75,43 @@ fn bench_packet_discard_single_sender(bencher: &mut Bencher) {
run_bench_packet_discard(1, bencher); run_bench_packet_discard(1, bencher);
} }
#[bench]
fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
const SIZE: usize = 30 * 1000;
const CHUNK_SIZE: usize = 1024;
fn new_rand_addr<R: Rng>(rng: &mut R) -> std::net::IpAddr {
let mut addr = [0u16; 8];
rng.fill(&mut addr);
std::net::IpAddr::from(addr)
}
let mut rng = thread_rng();
let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE);
let spam_addr = new_rand_addr(&mut rng);
for batch in batches.iter_mut() {
for packet in batch.packets.iter_mut() {
// One spam address, ~1000 unique addresses.
packet.meta.addr = if rng.gen_ratio(1, 30) {
new_rand_addr(&mut rng)
} else {
spam_addr
}
}
}
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.packets.iter_mut() {
if !packet.meta.discard() {
num_packets += 1;
}
packet.meta.set_discard(false);
}
}
assert_eq!(num_packets, 10_000);
});
}
#[bench] #[bench]
fn bench_sigverify_stage(bencher: &mut Bencher) { fn bench_sigverify_stage(bencher: &mut Bencher) {
solana_logger::setup(); solana_logger::setup();

View File

@ -8,12 +8,12 @@
use { use {
crate::sigverify, crate::sigverify,
crossbeam_channel::{SendError, Sender as CrossbeamSender}, crossbeam_channel::{SendError, Sender as CrossbeamSender},
itertools::Itertools,
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_perf::packet::PacketBatch, solana_perf::packet::PacketBatch,
solana_sdk::timing, solana_sdk::timing,
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError},
std::{ std::{
collections::{HashMap, VecDeque},
sync::mpsc::{Receiver, RecvTimeoutError}, sync::mpsc::{Receiver, RecvTimeoutError},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Instant, time::Instant,
@ -162,34 +162,27 @@ impl SigVerifyStage {
Self { thread_hdl } Self { thread_hdl }
} }
pub fn discard_excess_packets(batches: &mut Vec<PacketBatch>, max_packets: usize) { pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) {
let mut received_ips = HashMap::new(); // Group packets by their incoming IP address.
for (batch_index, batch) in batches.iter().enumerate() { let mut addrs = batches
for (packet_index, packets) in batch.packets.iter().enumerate() { .iter_mut()
let e = received_ips .rev()
.entry(packets.meta.addr().ip()) .flat_map(|batch| batch.packets.iter_mut().rev())
.or_insert_with(VecDeque::new); .map(|packet| (packet.meta.addr, packet))
e.push_back((batch_index, packet_index)); .into_group_map();
} // Allocate max_packets evenly across addresses.
while max_packets > 0 && !addrs.is_empty() {
let num_addrs = addrs.len();
addrs.retain(|_, packets| {
let cap = (max_packets + num_addrs - 1) / num_addrs;
max_packets -= packets.len().min(cap);
packets.truncate(packets.len().saturating_sub(cap));
!packets.is_empty()
});
} }
let mut batch_len = 0; // Discard excess packets from each address.
while batch_len < max_packets { for packet in addrs.into_values().flatten() {
for (_ip, indexes) in received_ips.iter_mut() { packet.meta.set_discard(true);
if !indexes.is_empty() {
indexes.pop_front();
batch_len += 1;
if batch_len >= max_packets {
break;
}
}
}
}
for (_addr, indexes) in received_ips {
for (batch_index, packet_index) in indexes {
batches[batch_index].packets[packet_index]
.meta
.set_discard(true);
}
} }
} }