From 2e56c59bcb2fde78303f11fa75f873b5d77ecbb0 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 24 Jan 2022 05:35:47 -0800 Subject: [PATCH] Handle already discarded packets in gpu sigverify path (#22680) --- core/src/cluster_info_vote_listener.rs | 6 +- core/src/sigverify.rs | 7 +- core/src/sigverify_shreds.rs | 9 +- core/src/sigverify_stage.rs | 16 +++- entry/src/entry.rs | 8 +- perf/benches/sigverify.rs | 70 +++++++++++++- perf/src/packet.rs | 20 ++-- perf/src/sigverify.rs | 122 ++++++++++++++++--------- 8 files changed, 193 insertions(+), 65 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index c4da643342..28fbdccac7 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -297,7 +297,11 @@ impl ClusterInfoVoteListener { let mut packet_batches = packet::to_packet_batches(&votes, 1); // Votes should already be filtered by this point. - sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false); + sigverify::ed25519_verify_cpu( + &mut packet_batches, + /*reject_non_vote=*/ false, + votes.len(), + ); let root_bank = bank_forks.read().unwrap().root_bank(); let epoch_schedule = root_bank.epoch_schedule(); votes diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 74dbf5bdfc..a00f06059c 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -40,12 +40,17 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + valid_packets: usize, + ) -> Vec { sigverify::ed25519_verify( &mut batches, &self.recycler, &self.recycler_out, self.reject_non_vote, + valid_packets, ); batches } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index 626dc516f5..403ebb8c0f 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -41,7 +41,11 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + _valid_packets: usize, + ) -> Vec { let r_bank = self.bank_forks.read().unwrap().working_bank(); let slots: HashSet = Self::read_slots(&batches); let mut leader_slots: HashMap = slots @@ -161,7 +165,8 @@ pub mod tests { batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); batches[0].packets[1].meta.size = shred.payload.len(); - let rv = verifier.verify_batches(batches); + let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); + let rv = verifier.verify_batches(batches, num_packets); assert!(!rv[0].packets[0].meta.discard()); assert!(rv[0].packets[1].meta.discard()); } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index f9373b8c14..ec9dc69283 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -40,7 +40,7 @@ pub struct SigVerifyStage { } pub trait SigVerifier { - fn verify_batches(&self, batches: Vec) -> Vec; + fn verify_batches(&self, batches: Vec, valid_packets: usize) -> Vec; } #[derive(Default, Clone)] @@ -171,7 +171,11 @@ impl SigVerifierStats { } impl SigVerifier for DisabledSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + _valid_packets: usize, + ) -> Vec { sigverify::ed25519_verify_disabled(&mut batches); batches } @@ -235,14 +239,16 @@ impl SigVerifyStage { let num_unique = num_packets.saturating_sub(dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); + let mut num_valid_packets = num_unique; if num_unique > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) - }; + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); + num_valid_packets = MAX_SIGVERIFY_BATCH; + } let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); - let batches = verifier.verify_batches(batches); + let batches = verifier.verify_batches(batches, num_valid_packets); sendr.send(batches)?; verify_batch_time.stop(); diff --git a/entry/src/entry.rs b/entry/src/entry.rs index 1615b86519..4ecf6a5509 100644 --- a/entry/src/entry.rs +++ b/entry/src/entry.rs @@ -35,7 +35,10 @@ use { cell::RefCell, cmp, ffi::OsStr, - sync::{Arc, Mutex, Once}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, Once, + }, thread::{self, JoinHandle}, time::Instant, }, @@ -497,11 +500,13 @@ pub fn start_verify_transactions( }) .flatten() .collect::>(); + let total_packets = AtomicUsize::new(0); let mut packet_batches = entry_txs .par_iter() .chunks(PACKETS_PER_BATCH) .map(|slice| { let vec_size = slice.len(); + total_packets.fetch_add(vec_size, Ordering::Relaxed); let mut packet_batch = PacketBatch::new_with_recycler( verify_recyclers.packet_recycler.clone(), vec_size, @@ -544,6 +549,7 @@ pub fn start_verify_transactions( &tx_offset_recycler, &out_recycler, false, + total_packets.load(Ordering::Relaxed), ); let verified = packet_batches .iter() diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 7c60f362b7..fec6e23007 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -3,22 +3,84 @@ extern crate test; use { - solana_perf::{packet::to_packet_batches, recycler::Recycler, sigverify, test_tx::test_tx}, + log::*, + rand::{thread_rng, Rng}, + solana_perf::{ + packet::{to_packet_batches, Packet, PacketBatch}, + recycler::Recycler, + sigverify, + test_tx::{test_multisig_tx, test_tx}, + }, test::Bencher, }; +const NUM: usize = 256; + #[bench] -fn bench_sigverify(bencher: &mut Bencher) { +fn bench_sigverify_simple(bencher: &mut Bencher) { let tx = test_tx(); + let num_packets = NUM; // generate packet vector - let mut batches = to_packet_batches(&std::iter::repeat(tx).take(128).collect::>(), 128); + let mut batches = to_packet_batches( + &std::iter::repeat(tx).take(num_packets).collect::>(), + 128, + ); let recycler = Recycler::default(); let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); + let _ans = + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_uneven(bencher: &mut Bencher) { + solana_logger::setup(); + let simple_tx = test_tx(); + let multi_tx = test_multisig_tx(); + let mut tx; + + let num_packets = NUM * 50; + let mut num_valid = 0; + let mut current_packets = 0; + // generate packet vector + let mut batches = vec![]; + while current_packets < num_packets { + let mut len: usize = thread_rng().gen_range(1, 128); + current_packets += len; + if current_packets > num_packets { + len -= current_packets - num_packets; + current_packets = num_packets; + } + let mut batch = PacketBatch::with_capacity(len); + batch.packets.resize(len, Packet::default()); + for packet in batch.packets.iter_mut() { + if thread_rng().gen_ratio(1, 2) { + tx = simple_tx.clone(); + } else { + tx = multi_tx.clone(); + }; + Packet::populate_packet(packet, None, &tx).expect("serialize request"); + if thread_rng().gen_ratio((num_packets - NUM) as u32, num_packets as u32) { + packet.meta.set_discard(true); + } else { + num_valid += 1; + } + } + batches.push(batch); + } + info!("num_packets: {} valid: {}", num_packets, num_valid); + + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + bencher.iter(|| { + let _ans = + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); }) } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index 7b2222a0f1..610bbcbb7d 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -82,16 +82,16 @@ impl PacketBatch { } pub fn to_packet_batches(xs: &[T], chunks: usize) -> Vec { - let mut out = vec![]; - for x in xs.chunks(chunks) { - let mut batch = PacketBatch::with_capacity(x.len()); - batch.packets.resize(x.len(), Packet::default()); - for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { - Packet::populate_packet(packet, None, i).expect("serialize request"); - } - out.push(batch); - } - out + xs.chunks(chunks) + .map(|x| { + let mut batch = PacketBatch::with_capacity(x.len()); + batch.packets.resize(x.len(), Packet::default()); + for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { + Packet::populate_packet(packet, None, i).expect("serialize request"); + } + batch + }) + .collect() } #[cfg(test)] diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 6228cbfd0d..83a895d790 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -385,41 +385,45 @@ pub fn generate_offsets( let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); msg_sizes.set_pinnable(); let mut current_offset: usize = 0; - let mut v_sig_lens = Vec::new(); - batches.iter_mut().for_each(|batch| { - let mut sig_lens = Vec::new(); - batch.packets.iter_mut().for_each(|packet| { - let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); + let offsets = batches + .iter_mut() + .map(|batch| { + batch + .packets + .iter_mut() + .map(|packet| { + let packet_offsets = + get_packet_offsets(packet, current_offset, reject_non_vote); - sig_lens.push(packet_offsets.sig_len); + trace!("pubkey_offset: {}", packet_offsets.pubkey_start); - trace!("pubkey_offset: {}", packet_offsets.pubkey_start); + let mut pubkey_offset = packet_offsets.pubkey_start; + let mut sig_offset = packet_offsets.sig_start; + let msg_size = current_offset.saturating_add(packet.meta.size) as u32; + for _ in 0..packet_offsets.sig_len { + signature_offsets.push(sig_offset); + sig_offset = sig_offset.saturating_add(size_of::() as u32); - let mut pubkey_offset = packet_offsets.pubkey_start; - let mut sig_offset = packet_offsets.sig_start; - let msg_size = current_offset.saturating_add(packet.meta.size) as u32; - for _ in 0..packet_offsets.sig_len { - signature_offsets.push(sig_offset); - sig_offset = sig_offset.saturating_add(size_of::() as u32); + pubkey_offsets.push(pubkey_offset); + pubkey_offset = pubkey_offset.saturating_add(size_of::() as u32); - pubkey_offsets.push(pubkey_offset); - pubkey_offset = pubkey_offset.saturating_add(size_of::() as u32); + msg_start_offsets.push(packet_offsets.msg_start); - msg_start_offsets.push(packet_offsets.msg_start); - - let msg_size = msg_size.saturating_sub(packet_offsets.msg_start); - msg_sizes.push(msg_size); - } - current_offset = current_offset.saturating_add(size_of::()); - }); - v_sig_lens.push(sig_lens); - }); + let msg_size = msg_size.saturating_sub(packet_offsets.msg_start); + msg_sizes.push(msg_size); + } + current_offset = current_offset.saturating_add(size_of::()); + packet_offsets.sig_len + }) + .collect() + }) + .collect(); ( signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, - v_sig_lens, + offsets, ) } @@ -492,9 +496,8 @@ impl Deduper { } } -pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { +pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); debug!("CPU ECDSA for {}", packet_count); PAR_THREAD_POOL.install(|| { batches.into_par_iter().for_each(|batch| { @@ -574,7 +577,9 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { for (batch, v) in batches.iter_mut().zip(r) { for (pkt, f) in batch.packets.iter_mut().zip(v) { - pkt.meta.set_discard(*f == 0); + if !pkt.meta.discard() { + pkt.meta.set_discard(*f == 0); + } } } } @@ -584,29 +589,35 @@ pub fn ed25519_verify( recycler: &Recycler, recycler_out: &Recycler>, reject_non_vote: bool, + valid_packet_count: usize, ) { let api = perf_libs::api(); if api.is_none() { - return ed25519_verify_cpu(batches, reject_non_vote); + return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count); } let api = api.unwrap(); use crate::packet::PACKET_DATA_SIZE; - let packet_count = count_packets_in_batches(batches); + let total_packet_count = count_packets_in_batches(batches); // micro-benchmarks show GPU time for smallest batch around 15-20ms // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice // power-of-two number around that accounting for the fact that the CPU // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover - if packet_count < 64 { - return ed25519_verify_cpu(batches, reject_non_vote); + if valid_packet_count < 64 + || 100usize + .wrapping_mul(valid_packet_count) + .wrapping_div(total_packet_count) + < 90 + { + return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = generate_offsets(batches, recycler, reject_non_vote); - debug!("CUDA ECDSA for {}", packet_count); + debug!("CUDA ECDSA for {}", valid_packet_count); debug!("allocating out.."); let mut out = recycler_out.allocate("out_buffer"); out.set_pinnable(); @@ -619,8 +630,7 @@ pub fn ed25519_verify( elems: batch.packets.as_ptr(), num: batch.packets.len() as u32, }); - let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + let v = vec![0u8; batch.packets.len()]; rvs.push(v); num_packets = num_packets.saturating_add(batch.packets.len()); } @@ -651,7 +661,7 @@ pub fn ed25519_verify( trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); mark_disabled(batches, &rvs); - inc_new_counter_debug!("ed25519_verify_gpu", packet_count); + inc_new_counter_debug!("ed25519_verify_gpu", valid_packet_count); } #[cfg(test)] @@ -704,6 +714,7 @@ mod tests { let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); assert!(batches[0].packets[0].meta.discard()); + batches[0].packets[0].meta.set_discard(false); mark_disabled(&mut batches, &[vec![1]]); assert!(!batches[0].packets[0].meta.discard()); } @@ -1005,6 +1016,29 @@ mod tests { ); } + fn generate_packet_batches_random_size( + packet: &Packet, + max_packets_per_batch: usize, + num_batches: usize, + ) -> Vec { + // generate packet vector + let batches: Vec<_> = (0..num_batches) + .map(|_| { + let mut packet_batch = PacketBatch::default(); + packet_batch.packets.resize(0, Packet::default()); + let num_packets_per_batch = thread_rng().gen_range(1, max_packets_per_batch); + for _ in 0..num_packets_per_batch { + packet_batch.packets.push(packet.clone()); + } + assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + packet_batch + }) + .collect(); + assert_eq!(batches.len(), num_batches); + + batches + } + fn generate_packet_batches( packet: &Packet, num_packets_per_batch: usize, @@ -1052,7 +1086,8 @@ mod tests { fn ed25519_verify(batches: &mut [PacketBatch]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); - sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); + let packet_count = sigverify::count_packets_in_batches(batches); + sigverify::ed25519_verify(batches, &recycler, &recycler_out, false, packet_count); } #[test] @@ -1133,9 +1168,8 @@ mod tests { let recycler = Recycler::default(); let recycler_out = Recycler::default(); for _ in 0..50 { - let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); - let mut batches = generate_packet_batches(&packet, n, num_batches); + let mut batches = generate_packet_batches_random_size(&packet, 128, num_batches); let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { @@ -1147,11 +1181,17 @@ mod tests { batches[batch].packets[packet].data[offset].wrapping_add(add); } + let batch_to_disable = thread_rng().gen_range(0, batches.len()); + for p in batches[batch_to_disable].packets.iter_mut() { + p.meta.set_discard(true); + } + // verify from GPU verification pipeline (when GPU verification is enabled) are // equivalent to the CPU verification pipeline. let mut batches_cpu = batches.clone(); - sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); - ed25519_verify_cpu(&mut batches_cpu, false); + let packet_count = sigverify::count_packets_in_batches(&batches); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, packet_count); + ed25519_verify_cpu(&mut batches_cpu, false, packet_count); // check result batches