sigverify -- dedupe bloom filter too slow followups

This commit is contained in:
Anatoly Yakovenko
2022-01-21 11:43:22 -08:00
committed by Trent Nelson
parent b354dae249
commit 620a80b581
3 changed files with 20 additions and 21 deletions

View File

@ -7,6 +7,7 @@
use { use {
crate::sigverify, crate::sigverify,
core::time::Duration,
crossbeam_channel::{SendError, Sender as CrossbeamSender}, crossbeam_channel::{SendError, Sender as CrossbeamSender},
itertools::Itertools, itertools::Itertools,
solana_measure::measure::Measure, solana_measure::measure::Measure,
@ -288,12 +289,12 @@ impl SigVerifyStage {
let verifier = verifier.clone(); let verifier = verifier.clone();
let mut stats = SigVerifierStats::default(); let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now(); let mut last_print = Instant::now();
const MAX_DEDUPER_AGE_MS: u64 = 2_000; const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2);
const MAX_DEDUPER_ITEMS: u32 = 1_000_000; const MAX_DEDUPER_ITEMS: u32 = 1_000_000;
Builder::new() Builder::new()
.name("solana-verifier".to_string()) .name("solana-verifier".to_string())
.spawn(move || { .spawn(move || {
let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE_MS); let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE);
loop { loop {
deduper.reset(); deduper.reset();
if let Err(e) = Self::verifier( if let Err(e) = Self::verifier(

View File

@ -9,6 +9,7 @@ use {
packet::{to_packet_batches, PacketBatch}, packet::{to_packet_batches, PacketBatch},
sigverify, sigverify,
}, },
std::time::Duration,
test::Bencher, test::Bencher,
}; };
@ -23,7 +24,7 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets // verify packets
let mut deduper = sigverify::Deduper::new(1_000_000, 2_000); let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000));
bencher.iter(|| { bencher.iter(|| {
let _ans = deduper.dedup_packets(&mut batches); let _ans = deduper.dedup_packets(&mut batches);
deduper.reset(); deduper.reset();
@ -111,7 +112,7 @@ fn bench_dedup_baseline(bencher: &mut Bencher) {
#[bench] #[bench]
#[ignore] #[ignore]
fn bench_dedup_reset(bencher: &mut Bencher) { fn bench_dedup_reset(bencher: &mut Bencher) {
let mut deduper = sigverify::Deduper::new(1_000_000, 0); let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(0));
bencher.iter(|| { bencher.iter(|| {
deduper.reset(); deduper.reset();
}); });

View File

@ -432,7 +432,7 @@ pub struct Deduper {
} }
impl Deduper { impl Deduper {
pub fn new(size: u32, max_age_ms: u64) -> Self { pub fn new(size: u32, max_age: Duration) -> Self {
let mut filter: Vec<AtomicU64> = Vec::with_capacity(size as usize); let mut filter: Vec<AtomicU64> = Vec::with_capacity(size as usize);
filter.resize_with(size as usize, Default::default); filter.resize_with(size as usize, Default::default);
let seed = thread_rng().gen(); let seed = thread_rng().gen();
@ -440,7 +440,7 @@ impl Deduper {
filter, filter,
seed, seed,
age: Instant::now(), age: Instant::now(),
max_age: Duration::from_millis(max_age_ms), max_age,
saturated: AtomicBool::new(false), saturated: AtomicBool::new(false),
} }
} }
@ -460,10 +460,10 @@ impl Deduper {
} }
} }
fn dedup_packet(&self, count: &AtomicU64, packet: &mut Packet) { fn dedup_packet(&self, packet: &mut Packet) -> u64 {
// If this packet was already marked as discard, drop it // If this packet was already marked as discard, drop it
if packet.meta.discard() { if packet.meta.discard() {
return; return 0;
} }
let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1); let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1);
hasher.write(&packet.data[0..packet.meta.size]); hasher.write(&packet.data[0..packet.meta.size]);
@ -479,19 +479,16 @@ impl Deduper {
} }
if hash == prev & hash { if hash == prev & hash {
packet.meta.set_discard(true); packet.meta.set_discard(true);
count.fetch_add(1, Ordering::Relaxed); return 1;
} }
0
} }
pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 { pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 {
let count = AtomicU64::new(0); batches
batches.iter_mut().for_each(|batch| {
batch
.packets
.iter_mut() .iter_mut()
.for_each(|p| self.dedup_packet(&count, p)) .flat_map(|batch| batch.packets.iter_mut().map(|p| self.dedup_packet(p)))
}); .sum()
count.load(Ordering::Relaxed)
} }
} }
@ -1346,14 +1343,14 @@ mod tests {
let mut batches = let mut batches =
to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128); to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches); let packet_count = sigverify::count_packets_in_batches(&batches);
let filter = Deduper::new(1_000_000, 0); let filter = Deduper::new(1_000_000, Duration::from_millis(0));
let discard = filter.dedup_packets(&mut batches) as usize; let discard = filter.dedup_packets(&mut batches) as usize;
assert_eq!(packet_count, discard + 1); assert_eq!(packet_count, discard + 1);
} }
#[test] #[test]
fn test_dedup_diff() { fn test_dedup_diff() {
let mut filter = Deduper::new(1_000_000, 0); let mut filter = Deduper::new(1_000_000, Duration::from_millis(0));
let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let discard = filter.dedup_packets(&mut batches) as usize; let discard = filter.dedup_packets(&mut batches) as usize;
@ -1368,7 +1365,7 @@ mod tests {
#[test] #[test]
#[ignore] #[ignore]
fn test_dedup_saturated() { fn test_dedup_saturated() {
let filter = Deduper::new(1_000_000, 0); let filter = Deduper::new(1_000_000, Duration::from_millis(0));
let mut discard = 0; let mut discard = 0;
assert!(!filter.saturated.load(Ordering::Relaxed)); assert!(!filter.saturated.load(Ordering::Relaxed));
for i in 0..1000 { for i in 0..1000 {
@ -1385,7 +1382,7 @@ mod tests {
#[test] #[test]
fn test_dedup_false_positive() { fn test_dedup_false_positive() {
let filter = Deduper::new(1_000_000, 0); let filter = Deduper::new(1_000_000, Duration::from_millis(0));
let mut discard = 0; let mut discard = 0;
for i in 0..10 { for i in 0..10 {
let mut batches = let mut batches =