Optimize packet dedup (#22571) (#22585)

* Use bloom filter to dedup packets

* dedup first

* Update bloom/src/bloom.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* Update core/src/sigverify_stage.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* fixup

* fixup

* fixup

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
(cherry picked from commit d343713f61)

# Conflicts:
#	Cargo.lock
#	core/Cargo.toml
#	core/src/banking_stage.rs
#	core/src/sigverify_stage.rs
#	gossip/Cargo.toml
#	perf/Cargo.toml
#	programs/bpf/Cargo.lock
#	runtime/Cargo.toml

Co-authored-by: anatoly yakovenko <anatoly@solana.com>
This commit is contained in:
mergify[bot]
2022-01-20 02:51:49 +00:00
committed by GitHub
parent 37e9076db0
commit dbf9a32883
20 changed files with 303 additions and 117 deletions

View File

@ -15,10 +15,13 @@ curve25519-dalek = { version = "3" }
dlopen = "0.1.8"
dlopen_derive = "0.1.4"
lazy_static = "1.4.0"
bv = { version = "0.11.1", features = ["serde"] }
fnv = "1.0.7"
log = "0.4.14"
rand = "0.7.0"
rayon = "1.5.1"
serde = "1.0.130"
solana-bloom = { path = "../bloom", version = "=1.9.5" }
solana-logger = { path = "../logger", version = "=1.9.5" }
solana-metrics = { path = "../metrics", version = "=1.9.5" }
solana-sdk = { path = "../sdk", version = "=1.9.5" }

45
perf/benches/dedup.rs Normal file
View File

@ -0,0 +1,45 @@
#![feature(test)]
extern crate test;
use {
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
test::Bencher,
};
#[bench]
fn bench_dedup_same(bencher: &mut Bencher) {
let tx = test_tx();
// generate packet vector
let mut batches = to_packet_batches(
&std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
128,
);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
println!("packet_count {} {}", packet_count, batches.len());
// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
}
#[bench]
fn bench_dedup_diff(bencher: &mut Bencher) {
// generate packet vector
let mut batches =
to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
println!("packet_count {} {}", packet_count, batches.len());
// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
}

View File

@ -4,6 +4,7 @@
//! to the GPU.
//!
use solana_bloom::bloom::AtomicBloom;
#[cfg(test)]
use solana_sdk::transaction::Transaction;
use {
@ -23,6 +24,7 @@ use {
short_vec::decode_shortu16_len,
signature::Signature,
},
std::sync::atomic::{AtomicU64, Ordering},
std::{convert::TryFrom, mem::size_of},
};
@ -418,6 +420,37 @@ pub fn generate_offsets(
)
}
fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) {
// If this packet was already marked as discard, drop it
if packet.meta.discard() {
return;
}
if bloom.contains(&packet.data.as_slice()) {
packet.meta.set_discard(true);
count.fetch_add(1, Ordering::Relaxed);
return;
}
bloom.add(&packet.data.as_slice());
}
pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
use rayon::prelude::*;
let packet_count = count_packets_in_batches(batches);
// machine specific random offset to read the u64 from the packet signature
let count = AtomicU64::new(0);
PAR_THREAD_POOL.install(|| {
batches.into_par_iter().for_each(|batch| {
batch
.packets
.par_iter_mut()
.for_each(|p| dedup_packet(&count, p, bloom))
})
});
inc_new_counter_debug!("dedup_packets_total", packet_count);
count.load(Ordering::Relaxed)
}
pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) {
use rayon::prelude::*;
let packet_count = count_packets_in_batches(batches);
@ -597,11 +630,12 @@ mod tests {
use {
super::*,
crate::{
packet::{Packet, PacketBatch},
packet::{to_packet_batches, Packet, PacketBatch},
sigverify::{self, PacketOffsets},
test_tx::{new_test_vote_tx, test_multisig_tx, test_tx},
},
bincode::{deserialize, serialize},
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_sdk::{
instruction::CompiledInstruction,
message::{Message, MessageHeader},
@ -1261,4 +1295,31 @@ mod tests {
current_offset = current_offset.saturating_add(size_of::<Packet>());
});
}
#[test]
fn test_dedup_same() {
let tx = test_tx();
// generate packet vector
let mut batches =
to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into();
let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize;
// because dedup uses a threadpool, there maybe up to N threads of txs that go through
let n = get_thread_count();
assert!(packet_count < discard + n * 2);
}
#[test]
fn test_dedup_diff() {
// generate packet vector
let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into();
let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize;
// because dedup uses a threadpool, there maybe up to N threads of txs that go through
let n = get_thread_count();
assert!(discard < n * 2);
}
}