* Speed up packet dedup and fix benches
* fix tests
* allow int arithmetic in bench
(cherry picked from commit a2d251ce1e)
Co-authored-by: Justin Starry <justin@solana.com>
			
			
This commit is contained in:
		@@ -174,11 +174,16 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
 | 
			
		||||
        (index as usize, mask)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn add(&self, key: &T) {
 | 
			
		||||
    /// Adds an item to the bloom filter and returns true if the item
 | 
			
		||||
    /// was not in the filter before.
 | 
			
		||||
    pub fn add(&self, key: &T) -> bool {
 | 
			
		||||
        let mut added = false;
 | 
			
		||||
        for k in &self.keys {
 | 
			
		||||
            let (index, mask) = self.pos(key, *k);
 | 
			
		||||
            self.bits[index].fetch_or(mask, Ordering::Relaxed);
 | 
			
		||||
            let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed);
 | 
			
		||||
            added = added || prev_val & mask == 0u64;
 | 
			
		||||
        }
 | 
			
		||||
        added
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn contains(&self, key: &T) -> bool {
 | 
			
		||||
@@ -189,6 +194,12 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn clear_for_tests(&mut self) {
 | 
			
		||||
        self.bits.iter().for_each(|bit| {
 | 
			
		||||
            bit.store(0u64, Ordering::Relaxed);
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Only for tests and simulations.
 | 
			
		||||
    pub fn mock_clone(&self) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
@@ -320,7 +331,9 @@ mod test {
 | 
			
		||||
        assert_eq!(bloom.keys.len(), 3);
 | 
			
		||||
        assert_eq!(bloom.num_bits, 6168);
 | 
			
		||||
        assert_eq!(bloom.bits.len(), 97);
 | 
			
		||||
        hash_values.par_iter().for_each(|v| bloom.add(v));
 | 
			
		||||
        hash_values.par_iter().for_each(|v| {
 | 
			
		||||
            bloom.add(v);
 | 
			
		||||
        });
 | 
			
		||||
        let bloom: Bloom<Hash> = bloom.into();
 | 
			
		||||
        assert_eq!(bloom.keys.len(), 3);
 | 
			
		||||
        assert_eq!(bloom.bits.len(), 6168);
 | 
			
		||||
@@ -362,7 +375,9 @@ mod test {
 | 
			
		||||
        }
 | 
			
		||||
        // Round trip, re-inserting the same hash values.
 | 
			
		||||
        let bloom: AtomicBloom<_> = bloom.into();
 | 
			
		||||
        hash_values.par_iter().for_each(|v| bloom.add(v));
 | 
			
		||||
        hash_values.par_iter().for_each(|v| {
 | 
			
		||||
            bloom.add(v);
 | 
			
		||||
        });
 | 
			
		||||
        for hash_value in &hash_values {
 | 
			
		||||
            assert!(bloom.contains(hash_value));
 | 
			
		||||
        }
 | 
			
		||||
@@ -380,7 +395,9 @@ mod test {
 | 
			
		||||
        let bloom: AtomicBloom<_> = bloom.into();
 | 
			
		||||
        assert_eq!(bloom.num_bits, 9731);
 | 
			
		||||
        assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
 | 
			
		||||
        more_hash_values.par_iter().for_each(|v| bloom.add(v));
 | 
			
		||||
        more_hash_values.par_iter().for_each(|v| {
 | 
			
		||||
            bloom.add(v);
 | 
			
		||||
        });
 | 
			
		||||
        for hash_value in &hash_values {
 | 
			
		||||
            assert!(bloom.contains(hash_value));
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,45 +1,99 @@
 | 
			
		||||
#![allow(clippy::integer_arithmetic)]
 | 
			
		||||
#![feature(test)]
 | 
			
		||||
 | 
			
		||||
extern crate test;
 | 
			
		||||
 | 
			
		||||
use {
 | 
			
		||||
    rand::prelude::*,
 | 
			
		||||
    solana_bloom::bloom::{AtomicBloom, Bloom},
 | 
			
		||||
    solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
 | 
			
		||||
    solana_perf::{
 | 
			
		||||
        packet::{to_packet_batches, PacketBatch},
 | 
			
		||||
        sigverify,
 | 
			
		||||
    },
 | 
			
		||||
    test::Bencher,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
fn bench_dedup_same(bencher: &mut Bencher) {
 | 
			
		||||
    let tx = test_tx();
 | 
			
		||||
fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
 | 
			
		||||
    // subtract 8 bytes because the length will get serialized as well
 | 
			
		||||
    (0..size.checked_sub(8).unwrap())
 | 
			
		||||
        .map(|_| rng.gen())
 | 
			
		||||
        .collect()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
    // generate packet vector
 | 
			
		||||
    let mut batches = to_packet_batches(
 | 
			
		||||
        &std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
 | 
			
		||||
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
 | 
			
		||||
    // verify packets
 | 
			
		||||
    let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
 | 
			
		||||
    bencher.iter(|| {
 | 
			
		||||
        // bench
 | 
			
		||||
        sigverify::dedup_packets(&bloom, &mut batches);
 | 
			
		||||
 | 
			
		||||
        // reset
 | 
			
		||||
        bloom.clear_for_tests();
 | 
			
		||||
        batches.iter_mut().for_each(|batch| {
 | 
			
		||||
            batch
 | 
			
		||||
                .packets
 | 
			
		||||
                .iter_mut()
 | 
			
		||||
                .for_each(|p| p.meta.set_discard(false))
 | 
			
		||||
        });
 | 
			
		||||
    })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
 | 
			
		||||
    let mut rng = rand::thread_rng();
 | 
			
		||||
    let small_packet = test_packet_with_size(128, &mut rng);
 | 
			
		||||
 | 
			
		||||
    let batches = to_packet_batches(
 | 
			
		||||
        &std::iter::repeat(small_packet)
 | 
			
		||||
            .take(4096)
 | 
			
		||||
            .collect::<Vec<_>>(),
 | 
			
		||||
        128,
 | 
			
		||||
    );
 | 
			
		||||
    let packet_count = sigverify::count_packets_in_batches(&batches);
 | 
			
		||||
    let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
 | 
			
		||||
 | 
			
		||||
    println!("packet_count {} {}", packet_count, batches.len());
 | 
			
		||||
 | 
			
		||||
    // verify packets
 | 
			
		||||
    bencher.iter(|| {
 | 
			
		||||
        let _ans = sigverify::dedup_packets(&bloom, &mut batches);
 | 
			
		||||
    })
 | 
			
		||||
    do_bench_dedup_packets(bencher, batches);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
fn bench_dedup_diff(bencher: &mut Bencher) {
 | 
			
		||||
    // generate packet vector
 | 
			
		||||
    let mut batches =
 | 
			
		||||
        to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
 | 
			
		||||
    let packet_count = sigverify::count_packets_in_batches(&batches);
 | 
			
		||||
    let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
 | 
			
		||||
    let mut rng = rand::thread_rng();
 | 
			
		||||
    let big_packet = test_packet_with_size(1024, &mut rng);
 | 
			
		||||
 | 
			
		||||
    println!("packet_count {} {}", packet_count, batches.len());
 | 
			
		||||
    let batches = to_packet_batches(
 | 
			
		||||
        &std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
 | 
			
		||||
        128,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    // verify packets
 | 
			
		||||
    bencher.iter(|| {
 | 
			
		||||
        let _ans = sigverify::dedup_packets(&bloom, &mut batches);
 | 
			
		||||
    })
 | 
			
		||||
    do_bench_dedup_packets(bencher, batches);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
 | 
			
		||||
    let mut rng = rand::thread_rng();
 | 
			
		||||
 | 
			
		||||
    let batches = to_packet_batches(
 | 
			
		||||
        &(0..4096)
 | 
			
		||||
            .map(|_| test_packet_with_size(128, &mut rng))
 | 
			
		||||
            .collect::<Vec<_>>(),
 | 
			
		||||
        128,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    do_bench_dedup_packets(bencher, batches);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[bench]
 | 
			
		||||
#[ignore]
 | 
			
		||||
fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
 | 
			
		||||
    let mut rng = rand::thread_rng();
 | 
			
		||||
 | 
			
		||||
    let batches = to_packet_batches(
 | 
			
		||||
        &(0..4096)
 | 
			
		||||
            .map(|_| test_packet_with_size(1024, &mut rng))
 | 
			
		||||
            .collect::<Vec<_>>(),
 | 
			
		||||
        128,
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
    do_bench_dedup_packets(bencher, batches);
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -426,12 +426,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if bloom.contains(&packet.data.as_slice()) {
 | 
			
		||||
    // If this packet was not newly added, it's a dup and should be discarded
 | 
			
		||||
    if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) {
 | 
			
		||||
        packet.meta.set_discard(true);
 | 
			
		||||
        count.fetch_add(1, Ordering::Relaxed);
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    bloom.add(&packet.data.as_slice());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user