Speed up packet dedup and fix benches (#22592)
* Speed up packet dedup and fix benches * fix tests * allow int arithmetic in bench
This commit is contained in:
@ -174,11 +174,16 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
|
|||||||
(index as usize, mask)
|
(index as usize, mask)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn add(&self, key: &T) {
|
/// Adds an item to the bloom filter and returns true if the item
|
||||||
|
/// was not in the filter before.
|
||||||
|
pub fn add(&self, key: &T) -> bool {
|
||||||
|
let mut added = false;
|
||||||
for k in &self.keys {
|
for k in &self.keys {
|
||||||
let (index, mask) = self.pos(key, *k);
|
let (index, mask) = self.pos(key, *k);
|
||||||
self.bits[index].fetch_or(mask, Ordering::Relaxed);
|
let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed);
|
||||||
|
added = added || prev_val & mask == 0u64;
|
||||||
}
|
}
|
||||||
|
added
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn contains(&self, key: &T) -> bool {
|
pub fn contains(&self, key: &T) -> bool {
|
||||||
@ -189,6 +194,12 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn clear_for_tests(&mut self) {
|
||||||
|
self.bits.iter().for_each(|bit| {
|
||||||
|
bit.store(0u64, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Only for tests and simulations.
|
// Only for tests and simulations.
|
||||||
pub fn mock_clone(&self) -> Self {
|
pub fn mock_clone(&self) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@ -320,7 +331,9 @@ mod test {
|
|||||||
assert_eq!(bloom.keys.len(), 3);
|
assert_eq!(bloom.keys.len(), 3);
|
||||||
assert_eq!(bloom.num_bits, 6168);
|
assert_eq!(bloom.num_bits, 6168);
|
||||||
assert_eq!(bloom.bits.len(), 97);
|
assert_eq!(bloom.bits.len(), 97);
|
||||||
hash_values.par_iter().for_each(|v| bloom.add(v));
|
hash_values.par_iter().for_each(|v| {
|
||||||
|
bloom.add(v);
|
||||||
|
});
|
||||||
let bloom: Bloom<Hash> = bloom.into();
|
let bloom: Bloom<Hash> = bloom.into();
|
||||||
assert_eq!(bloom.keys.len(), 3);
|
assert_eq!(bloom.keys.len(), 3);
|
||||||
assert_eq!(bloom.bits.len(), 6168);
|
assert_eq!(bloom.bits.len(), 6168);
|
||||||
@ -362,7 +375,9 @@ mod test {
|
|||||||
}
|
}
|
||||||
// Round trip, re-inserting the same hash values.
|
// Round trip, re-inserting the same hash values.
|
||||||
let bloom: AtomicBloom<_> = bloom.into();
|
let bloom: AtomicBloom<_> = bloom.into();
|
||||||
hash_values.par_iter().for_each(|v| bloom.add(v));
|
hash_values.par_iter().for_each(|v| {
|
||||||
|
bloom.add(v);
|
||||||
|
});
|
||||||
for hash_value in &hash_values {
|
for hash_value in &hash_values {
|
||||||
assert!(bloom.contains(hash_value));
|
assert!(bloom.contains(hash_value));
|
||||||
}
|
}
|
||||||
@ -380,7 +395,9 @@ mod test {
|
|||||||
let bloom: AtomicBloom<_> = bloom.into();
|
let bloom: AtomicBloom<_> = bloom.into();
|
||||||
assert_eq!(bloom.num_bits, 9731);
|
assert_eq!(bloom.num_bits, 9731);
|
||||||
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
|
||||||
more_hash_values.par_iter().for_each(|v| bloom.add(v));
|
more_hash_values.par_iter().for_each(|v| {
|
||||||
|
bloom.add(v);
|
||||||
|
});
|
||||||
for hash_value in &hash_values {
|
for hash_value in &hash_values {
|
||||||
assert!(bloom.contains(hash_value));
|
assert!(bloom.contains(hash_value));
|
||||||
}
|
}
|
||||||
|
@ -1,45 +1,99 @@
|
|||||||
|
#![allow(clippy::integer_arithmetic)]
|
||||||
#![feature(test)]
|
#![feature(test)]
|
||||||
|
|
||||||
extern crate test;
|
extern crate test;
|
||||||
|
|
||||||
use {
|
use {
|
||||||
|
rand::prelude::*,
|
||||||
solana_bloom::bloom::{AtomicBloom, Bloom},
|
solana_bloom::bloom::{AtomicBloom, Bloom},
|
||||||
solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
|
solana_perf::{
|
||||||
|
packet::{to_packet_batches, PacketBatch},
|
||||||
|
sigverify,
|
||||||
|
},
|
||||||
test::Bencher,
|
test::Bencher,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[bench]
|
fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
|
||||||
fn bench_dedup_same(bencher: &mut Bencher) {
|
// subtract 8 bytes because the length will get serialized as well
|
||||||
let tx = test_tx();
|
(0..size.checked_sub(8).unwrap())
|
||||||
|
.map(|_| rng.gen())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
// generate packet vector
|
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
|
||||||
let mut batches = to_packet_batches(
|
// verify packets
|
||||||
&std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
|
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
|
||||||
|
bencher.iter(|| {
|
||||||
|
// bench
|
||||||
|
sigverify::dedup_packets(&bloom, &mut batches);
|
||||||
|
|
||||||
|
// reset
|
||||||
|
bloom.clear_for_tests();
|
||||||
|
batches.iter_mut().for_each(|batch| {
|
||||||
|
batch
|
||||||
|
.packets
|
||||||
|
.iter_mut()
|
||||||
|
.for_each(|p| p.meta.set_discard(false))
|
||||||
|
});
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let small_packet = test_packet_with_size(128, &mut rng);
|
||||||
|
|
||||||
|
let batches = to_packet_batches(
|
||||||
|
&std::iter::repeat(small_packet)
|
||||||
|
.take(4096)
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
128,
|
128,
|
||||||
);
|
);
|
||||||
let packet_count = sigverify::count_packets_in_batches(&batches);
|
|
||||||
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
|
|
||||||
|
|
||||||
println!("packet_count {} {}", packet_count, batches.len());
|
do_bench_dedup_packets(bencher, batches);
|
||||||
|
|
||||||
// verify packets
|
|
||||||
bencher.iter(|| {
|
|
||||||
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[bench]
|
#[bench]
|
||||||
fn bench_dedup_diff(bencher: &mut Bencher) {
|
#[ignore]
|
||||||
// generate packet vector
|
fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
|
||||||
let mut batches =
|
let mut rng = rand::thread_rng();
|
||||||
to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
|
let big_packet = test_packet_with_size(1024, &mut rng);
|
||||||
let packet_count = sigverify::count_packets_in_batches(&batches);
|
|
||||||
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
|
|
||||||
|
|
||||||
println!("packet_count {} {}", packet_count, batches.len());
|
let batches = to_packet_batches(
|
||||||
|
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
|
||||||
|
128,
|
||||||
|
);
|
||||||
|
|
||||||
// verify packets
|
do_bench_dedup_packets(bencher, batches);
|
||||||
bencher.iter(|| {
|
}
|
||||||
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
|
|
||||||
})
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
|
||||||
|
let batches = to_packet_batches(
|
||||||
|
&(0..4096)
|
||||||
|
.map(|_| test_packet_with_size(128, &mut rng))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
128,
|
||||||
|
);
|
||||||
|
|
||||||
|
do_bench_dedup_packets(bencher, batches);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[bench]
|
||||||
|
#[ignore]
|
||||||
|
fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
|
||||||
|
let batches = to_packet_batches(
|
||||||
|
&(0..4096)
|
||||||
|
.map(|_| test_packet_with_size(1024, &mut rng))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
128,
|
||||||
|
);
|
||||||
|
|
||||||
|
do_bench_dedup_packets(bencher, batches);
|
||||||
}
|
}
|
||||||
|
@ -426,12 +426,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if bloom.contains(&packet.data.as_slice()) {
|
// If this packet was not newly added, it's a dup and should be discarded
|
||||||
|
if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) {
|
||||||
packet.meta.set_discard(true);
|
packet.meta.set_discard(true);
|
||||||
count.fetch_add(1, Ordering::Relaxed);
|
count.fetch_add(1, Ordering::Relaxed);
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
bloom.add(&packet.data.as_slice());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
|
pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
|
||||||
|
Reference in New Issue
Block a user