diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 75542050ff..412a399744 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,6 +7,7 @@ use { crate::sigverify, + core::time::Duration, crossbeam_channel::{SendError, Sender as CrossbeamSender}, itertools::Itertools, solana_measure::measure::Measure, @@ -288,12 +289,12 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); - const MAX_DEDUPER_AGE_MS: u64 = 2_000; + const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2); const MAX_DEDUPER_ITEMS: u32 = 1_000_000; Builder::new() .name("solana-verifier".to_string()) .spawn(move || { - let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE_MS); + let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE); loop { deduper.reset(); if let Err(e) = Self::verifier( diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index cba4eca82c..c2d72f0839 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -9,6 +9,7 @@ use { packet::{to_packet_batches, PacketBatch}, sigverify, }, + std::time::Duration, test::Bencher, }; @@ -23,7 +24,7 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets - let mut deduper = sigverify::Deduper::new(1_000_000, 2_000); + let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000)); bencher.iter(|| { let _ans = deduper.dedup_packets(&mut batches); deduper.reset(); @@ -111,7 +112,7 @@ fn bench_dedup_baseline(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_dedup_reset(bencher: &mut Bencher) { - let mut deduper = sigverify::Deduper::new(1_000_000, 0); + let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(0)); bencher.iter(|| { deduper.reset(); }); diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 382fe4b950..daf0026145 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -432,7 +432,7 @@ pub struct Deduper { } impl Deduper { - pub fn new(size: u32, max_age_ms: u64) -> Self { + pub fn new(size: u32, max_age: Duration) -> Self { let mut filter: Vec = Vec::with_capacity(size as usize); filter.resize_with(size as usize, Default::default); let seed = thread_rng().gen(); @@ -440,7 +440,7 @@ impl Deduper { filter, seed, age: Instant::now(), - max_age: Duration::from_millis(max_age_ms), + max_age, saturated: AtomicBool::new(false), } } @@ -460,10 +460,10 @@ impl Deduper { } } - fn dedup_packet(&self, count: &AtomicU64, packet: &mut Packet) { + fn dedup_packet(&self, packet: &mut Packet) -> u64 { // If this packet was already marked as discard, drop it if packet.meta.discard() { - return; + return 0; } let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1); hasher.write(&packet.data[0..packet.meta.size]); @@ -479,19 +479,16 @@ impl Deduper { } if hash == prev & hash { packet.meta.set_discard(true); - count.fetch_add(1, Ordering::Relaxed); + return 1; } + 0 } pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 { - let count = AtomicU64::new(0); - batches.iter_mut().for_each(|batch| { - batch - .packets - .iter_mut() - .for_each(|p| self.dedup_packet(&count, p)) - }); - count.load(Ordering::Relaxed) + batches + .iter_mut() + .flat_map(|batch| batch.packets.iter_mut().map(|p| self.dedup_packet(p))) + .sum() } } @@ -1346,14 +1343,14 @@ mod tests { let mut batches = to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); - let filter = Deduper::new(1_000_000, 0); + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); let discard = filter.dedup_packets(&mut batches) as usize; assert_eq!(packet_count, discard + 1); } #[test] fn test_dedup_diff() { - let mut filter = Deduper::new(1_000_000, 0); + let mut filter = Deduper::new(1_000_000, Duration::from_millis(0)); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); let discard = filter.dedup_packets(&mut batches) as usize; @@ -1368,7 +1365,7 @@ mod tests { #[test] #[ignore] fn test_dedup_saturated() { - let filter = Deduper::new(1_000_000, 0); + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); let mut discard = 0; assert!(!filter.saturated.load(Ordering::Relaxed)); for i in 0..1000 { @@ -1385,7 +1382,7 @@ mod tests { #[test] fn test_dedup_false_positive() { - let filter = Deduper::new(1_000_000, 0); + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); let mut discard = 0; for i in 0..10 { let mut batches =