diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 8396bb529b..ab18e10dce 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -418,6 +418,55 @@ pub fn generate_offsets( ) } +fn dedup_packet(packet: &mut Packet, bloom: &AtomicBloom, offset: usize) { + let packet_offsets = get_packet_offsets(packet, 0, reject_non_vote); + let mut sig_start = packet_offsets.sig_start as usize; + let mut pubkey_start = packet_offsets.pubkey_start as usize; + let msg_start = packet_offsets.msg_start as usize; + + // If this packet was already marked as discard, drop it + if packet.meta.discard() { + return; + } + + if packet_offsets.sig_len == 0 { + packet.meta.set_discard(true); + return; + } + + if packet.meta.size <= msg_start { + packet.meta.set_discard(true); + return; + } + + let msg_end = packet.meta.size; + let pubkey_end = pubkey_start.saturating_add(size_of::()); + let sig_end = sig_start.saturating_add(size_of::()); + + let sig_u64 = u64::from_le_bytes(packet.data[sig_start + offset..sig_end]); + if bloom.contains(sig_u64) { + packet.meta.set_discard(true); + return; + } + bloom.add(signature); +} + +pub fn dedup_packets(batches: &mut [PacketBatch]) { + use rayon::prelude::*; + let packet_count = count_packets_in_batches(batches); + let bloom: AtomicBloom<_> = Bloom::random(packet_count, 0.0001, 4096).into(); + let offset = thread_rng().gen_range(0, 64 - 8); + PAR_THREAD_POOL.install(|| { + batches.into_par_iter().for_each(|batch| { + batch + .packets + .par_iter_mut() + .for_each(|p| dedup_packet(p, &bloom, offset)) + }) + }); + inc_new_counter_debug!("dedup_packets", packet_count); +} + pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { use rayon::prelude::*; let packet_count = count_packets_in_batches(batches);