From f77ea5f324ca879adc81691a93bbc4f6ceffae4f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 19 Jan 2022 20:14:10 +0000 Subject: [PATCH] improves sigverify discard_excess_packets performance (backport #22577) (#22580) * improves sigverify discard_excess_packets performance (#22577) As shown by the added benchmark, current code does worse if there is a spam address plus a lot of unique addresses. on current master: test bench_packet_discard_many_senders ... bench: 1,997,960 ns/iter (+/- 103,715) test bench_packet_discard_mixed_senders ... bench: 14,256,116 ns/iter (+/- 534,865) test bench_packet_discard_single_sender ... bench: 1,306,809 ns/iter (+/- 61,992) with this commit: test bench_packet_discard_many_senders ... bench: 1,644,025 ns/iter (+/- 83,715) test bench_packet_discard_mixed_senders ... bench: 1,089,789 ns/iter (+/- 86,324) test bench_packet_discard_single_sender ... bench: 955,234 ns/iter (+/- 55,953) (cherry picked from commit dcf44d252355178f1c308bb99657ee159ffbfd09) # Conflicts: # core/src/sigverify_stage.rs * removes mergify merge conflicts Co-authored-by: behzad nouri --- core/benches/sigverify_stage.rs | 42 ++++++++++++++++++++++++++++ core/src/sigverify_stage.rs | 49 ++++++++++++++------------------- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 7ea7cb4b37..a0ae461079 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -52,11 +52,16 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) { bencher.iter(move || { SigVerifyStage::discard_excess_packets(&mut batches, 10_000); + let mut num_packets = 0; for batch in batches.iter_mut() { for p in batch.packets.iter_mut() { + if !p.meta.discard() { + num_packets += 1; + } p.meta.set_discard(false); } } + assert_eq!(num_packets, 10_000); }); } @@ -70,6 +75,43 @@ fn bench_packet_discard_single_sender(bencher: &mut Bencher) { run_bench_packet_discard(1, bencher); } +#[bench] +fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) { + const SIZE: usize = 30 * 1000; + const CHUNK_SIZE: usize = 1024; + fn new_rand_addr(rng: &mut R) -> std::net::IpAddr { + let mut addr = [0u16; 8]; + rng.fill(&mut addr); + std::net::IpAddr::from(addr) + } + let mut rng = thread_rng(); + let mut batches = to_packet_batches(&vec![test_tx(); SIZE], CHUNK_SIZE); + let spam_addr = new_rand_addr(&mut rng); + for batch in batches.iter_mut() { + for packet in batch.packets.iter_mut() { + // One spam address, ~1000 unique addresses. + packet.meta.addr = if rng.gen_ratio(1, 30) { + new_rand_addr(&mut rng) + } else { + spam_addr + } + } + } + bencher.iter(move || { + SigVerifyStage::discard_excess_packets(&mut batches, 10_000); + let mut num_packets = 0; + for batch in batches.iter_mut() { + for packet in batch.packets.iter_mut() { + if !packet.meta.discard() { + num_packets += 1; + } + packet.meta.set_discard(false); + } + } + assert_eq!(num_packets, 10_000); + }); +} + #[bench] fn bench_sigverify_stage(bencher: &mut Bencher) { solana_logger::setup(); diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 005d1a46e7..ae121f9a45 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -8,12 +8,12 @@ use { crate::sigverify, crossbeam_channel::{SendError, Sender as CrossbeamSender}, + itertools::Itertools, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ - collections::{HashMap, VecDeque}, sync::mpsc::{Receiver, RecvTimeoutError}, thread::{self, Builder, JoinHandle}, time::Instant, @@ -162,34 +162,27 @@ impl SigVerifyStage { Self { thread_hdl } } - pub fn discard_excess_packets(batches: &mut Vec, max_packets: usize) { - let mut received_ips = HashMap::new(); - for (batch_index, batch) in batches.iter().enumerate() { - for (packet_index, packets) in batch.packets.iter().enumerate() { - let e = received_ips - .entry(packets.meta.addr().ip()) - .or_insert_with(VecDeque::new); - e.push_back((batch_index, packet_index)); - } + pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) { + // Group packets by their incoming IP address. + let mut addrs = batches + .iter_mut() + .rev() + .flat_map(|batch| batch.packets.iter_mut().rev()) + .map(|packet| (packet.meta.addr, packet)) + .into_group_map(); + // Allocate max_packets evenly across addresses. + while max_packets > 0 && !addrs.is_empty() { + let num_addrs = addrs.len(); + addrs.retain(|_, packets| { + let cap = (max_packets + num_addrs - 1) / num_addrs; + max_packets -= packets.len().min(cap); + packets.truncate(packets.len().saturating_sub(cap)); + !packets.is_empty() + }); } - let mut batch_len = 0; - while batch_len < max_packets { - for (_ip, indexes) in received_ips.iter_mut() { - if !indexes.is_empty() { - indexes.pop_front(); - batch_len += 1; - if batch_len >= max_packets { - break; - } - } - } - } - for (_addr, indexes) in received_ips { - for (batch_index, packet_index) in indexes { - batches[batch_index].packets[packet_index] - .meta - .set_discard(true); - } + // Discard excess packets from each address. + for packet in addrs.into_values().flatten() { + packet.meta.set_discard(true); } }