diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 02785dd53d..3e1fe7e13c 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -299,7 +299,11 @@ impl ClusterInfoVoteListener { let mut packet_batches = packet::to_packet_batches(&votes, 1); // Votes should already be filtered by this point. - sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false); + sigverify::ed25519_verify_cpu( + &mut packet_batches, + /*reject_non_vote=*/ false, + votes.len(), + ); let root_bank = bank_forks.read().unwrap().root_bank(); let epoch_schedule = root_bank.epoch_schedule(); votes diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 74dbf5bdfc..a00f06059c 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -40,12 +40,17 @@ impl Default for TransactionSigVerifier { } impl SigVerifier for TransactionSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + valid_packets: usize, + ) -> Vec { sigverify::ed25519_verify( &mut batches, &self.recycler, &self.recycler_out, self.reject_non_vote, + valid_packets, ); batches } diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index e9309b434f..533147d629 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -41,7 +41,11 @@ impl ShredSigVerifier { } impl SigVerifier for ShredSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + _valid_packets: usize, + ) -> Vec { let r_bank = self.bank_forks.read().unwrap().working_bank(); let slots: HashSet = Self::read_slots(&batches); let mut leader_slots: HashMap = slots @@ -160,8 +164,15 @@ pub mod tests { batches[0].packets[1].data[0..shred.payload.len()].copy_from_slice(&shred.payload); batches[0].packets[1].meta.size = shred.payload.len(); +<<<<<<< HEAD let rv = verifier.verify_batches(batches); assert!(!rv[0].packets[0].meta.discard); assert!(rv[0].packets[1].meta.discard); +======= + let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches); + let rv = verifier.verify_batches(batches, num_packets); + assert!(!rv[0].packets[0].meta.discard()); + assert!(rv[0].packets[1].meta.discard()); +>>>>>>> 2e56c59bc (Handle already discarded packets in gpu sigverify path (#22680)) } } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 4d9a260281..4a56bd539b 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -41,7 +41,7 @@ pub struct SigVerifyStage { } pub trait SigVerifier { - fn verify_batches(&self, batches: Vec) -> Vec; + fn verify_batches(&self, batches: Vec, valid_packets: usize) -> Vec; } #[derive(Default, Clone)] @@ -172,7 +172,11 @@ impl SigVerifierStats { } impl SigVerifier for DisabledSigVerifier { - fn verify_batches(&self, mut batches: Vec) -> Vec { + fn verify_batches( + &self, + mut batches: Vec, + _valid_packets: usize, + ) -> Vec { sigverify::ed25519_verify_disabled(&mut batches); batches } @@ -236,14 +240,16 @@ impl SigVerifyStage { let num_unique = num_packets.saturating_sub(dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); + let mut num_valid_packets = num_unique; if num_unique > MAX_SIGVERIFY_BATCH { - Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) - }; + Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH); + num_valid_packets = MAX_SIGVERIFY_BATCH; + } let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); - let batches = verifier.verify_batches(batches); + let batches = verifier.verify_batches(batches, num_valid_packets); sendr.send(batches)?; verify_batch_time.stop(); diff --git a/ledger/src/entry.rs b/ledger/src/entry.rs index 272262f571..e40a6d5ada 100644 --- a/ledger/src/entry.rs +++ b/ledger/src/entry.rs @@ -29,7 +29,11 @@ use { cmp, ffi::OsStr, sync::{ +<<<<<<< HEAD:ledger/src/entry.rs mpsc::{Receiver, Sender}, +======= + atomic::{AtomicUsize, Ordering}, +>>>>>>> 2e56c59bc (Handle already discarded packets in gpu sigverify path (#22680)):entry/src/entry.rs Arc, Mutex, Once, }, thread::{self, JoinHandle}, @@ -332,6 +336,202 @@ impl EntryVerificationState { } } +<<<<<<< HEAD:ledger/src/entry.rs +======= +pub fn verify_transactions( + entries: Vec, + verify: Arc Result + Send + Sync>, +) -> Result> { + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + entries + .into_par_iter() + .map(|entry| { + if entry.transactions.is_empty() { + Ok(EntryType::Tick(entry.hash)) + } else { + Ok(EntryType::Transactions( + entry + .transactions + .into_par_iter() + .map(verify.as_ref()) + .collect::>>()?, + )) + } + }) + .collect() + }) + }) +} + +pub fn start_verify_transactions( + entries: Vec, + skip_verification: bool, + verify_recyclers: VerifyRecyclers, + verify: Arc< + dyn Fn(VersionedTransaction, TransactionVerificationMode) -> Result + + Send + + Sync, + >, +) -> Result { + let api = perf_libs::api(); + + // Use the CPU if we have too few transactions for GPU signature verification to be worth it. + // We will also use the CPU if no acceleration API is used or if we're skipping + // the signature verification as we'd have nothing to do on the GPU in that case. + // TODO: make the CPU-to GPU crossover point dynamic, perhaps based on similar future + // heuristics to what might be used in sigverify::ed25519_verify when a dynamic crossover + // is introduced for that function (see TODO in sigverify::ed25519_verify) + let use_cpu = skip_verification + || api.is_none() + || entries + .iter() + .try_fold(0, |accum: usize, entry: &Entry| -> Option { + if accum.saturating_add(entry.transactions.len()) < 512 { + Some(accum.saturating_add(entry.transactions.len())) + } else { + None + } + }) + .is_some(); + + if use_cpu { + let verify_func = { + let verification_mode = if skip_verification { + TransactionVerificationMode::HashOnly + } else { + TransactionVerificationMode::FullVerification + }; + move |versioned_tx: VersionedTransaction| -> Result { + verify(versioned_tx, verification_mode) + } + }; + + let entries = verify_transactions(entries, Arc::new(verify_func)); + + match entries { + Ok(entries_val) => { + return Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Success, + entries: Some(entries_val), + device_verification_data: DeviceSigVerificationData::Cpu(), + gpu_verify_duration_us: 0, + }); + } + Err(err) => { + return Err(err); + } + } + } + + let verify_func = { + move |versioned_tx: VersionedTransaction| -> Result { + verify( + versioned_tx, + TransactionVerificationMode::HashAndVerifyPrecompiles, + ) + } + }; + let entries = verify_transactions(entries, Arc::new(verify_func)); + match entries { + Ok(entries) => { + let num_transactions: usize = entries + .iter() + .map(|entry: &EntryType| -> usize { + match entry { + EntryType::Transactions(transactions) => transactions.len(), + EntryType::Tick(_) => 0, + } + }) + .sum(); + + if num_transactions == 0 { + return Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Success, + entries: Some(entries), + device_verification_data: DeviceSigVerificationData::Cpu(), + gpu_verify_duration_us: 0, + }); + } + let entry_txs: Vec<&SanitizedTransaction> = entries + .iter() + .filter_map(|entry_type| match entry_type { + EntryType::Tick(_) => None, + EntryType::Transactions(transactions) => Some(transactions), + }) + .flatten() + .collect::>(); + let total_packets = AtomicUsize::new(0); + let mut packet_batches = entry_txs + .par_iter() + .chunks(PACKETS_PER_BATCH) + .map(|slice| { + let vec_size = slice.len(); + total_packets.fetch_add(vec_size, Ordering::Relaxed); + let mut packet_batch = PacketBatch::new_with_recycler( + verify_recyclers.packet_recycler.clone(), + vec_size, + "entry-sig-verify", + ); + // We use set_len here instead of resize(num_transactions, Packet::default()), to save + // memory bandwidth and avoid writing a large amount of data that will be overwritten + // soon afterwards. As well, Packet::default() actually leaves the packet data + // uninitialized anyway, so the initilization would simply write junk into + // the vector anyway. + unsafe { + packet_batch.packets.set_len(vec_size); + } + let entry_tx_iter = slice + .into_par_iter() + .map(|tx| tx.to_versioned_transaction()); + + let res = packet_batch + .packets + .par_iter_mut() + .zip(entry_tx_iter) + .all(|pair| { + pair.0.meta = Meta::default(); + Packet::populate_packet(pair.0, None, &pair.1).is_ok() + }); + if res { + Ok(packet_batch) + } else { + Err(TransactionError::SanitizeFailure) + } + }) + .collect::>>()?; + + let tx_offset_recycler = verify_recyclers.tx_offset_recycler; + let out_recycler = verify_recyclers.out_recycler; + let gpu_verify_thread = thread::spawn(move || { + let mut verify_time = Measure::start("sigverify"); + sigverify::ed25519_verify( + &mut packet_batches, + &tx_offset_recycler, + &out_recycler, + false, + total_packets.load(Ordering::Relaxed), + ); + let verified = packet_batches + .iter() + .all(|batch| batch.packets.iter().all(|p| !p.meta.discard())); + verify_time.stop(); + (verified, verify_time.as_us()) + }); + Ok(EntrySigVerificationState { + verification_status: EntryVerificationStatus::Pending, + entries: Some(entries), + device_verification_data: DeviceSigVerificationData::Gpu(GpuSigVerificationData { + thread_h: Some(gpu_verify_thread), + }), + gpu_verify_duration_us: 0, + }) + } + Err(err) => Err(err), + } +} + +>>>>>>> 2e56c59bc (Handle already discarded packets in gpu sigverify path (#22680)):entry/src/entry.rs fn compare_hashes(computed_hash: Hash, ref_entry: &Entry) -> bool { let actual = if !ref_entry.transactions.is_empty() { let tx_hash = hash_transactions(&ref_entry.transactions); diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 7c60f362b7..fec6e23007 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -3,22 +3,84 @@ extern crate test; use { - solana_perf::{packet::to_packet_batches, recycler::Recycler, sigverify, test_tx::test_tx}, + log::*, + rand::{thread_rng, Rng}, + solana_perf::{ + packet::{to_packet_batches, Packet, PacketBatch}, + recycler::Recycler, + sigverify, + test_tx::{test_multisig_tx, test_tx}, + }, test::Bencher, }; +const NUM: usize = 256; + #[bench] -fn bench_sigverify(bencher: &mut Bencher) { +fn bench_sigverify_simple(bencher: &mut Bencher) { let tx = test_tx(); + let num_packets = NUM; // generate packet vector - let mut batches = to_packet_batches(&std::iter::repeat(tx).take(128).collect::>(), 128); + let mut batches = to_packet_batches( + &std::iter::repeat(tx).take(num_packets).collect::>(), + 128, + ); let recycler = Recycler::default(); let recycler_out = Recycler::default(); // verify packets bencher.iter(|| { - let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); + let _ans = + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); + }) +} + +#[bench] +#[ignore] +fn bench_sigverify_uneven(bencher: &mut Bencher) { + solana_logger::setup(); + let simple_tx = test_tx(); + let multi_tx = test_multisig_tx(); + let mut tx; + + let num_packets = NUM * 50; + let mut num_valid = 0; + let mut current_packets = 0; + // generate packet vector + let mut batches = vec![]; + while current_packets < num_packets { + let mut len: usize = thread_rng().gen_range(1, 128); + current_packets += len; + if current_packets > num_packets { + len -= current_packets - num_packets; + current_packets = num_packets; + } + let mut batch = PacketBatch::with_capacity(len); + batch.packets.resize(len, Packet::default()); + for packet in batch.packets.iter_mut() { + if thread_rng().gen_ratio(1, 2) { + tx = simple_tx.clone(); + } else { + tx = multi_tx.clone(); + }; + Packet::populate_packet(packet, None, &tx).expect("serialize request"); + if thread_rng().gen_ratio((num_packets - NUM) as u32, num_packets as u32) { + packet.meta.set_discard(true); + } else { + num_valid += 1; + } + } + batches.push(batch); + } + info!("num_packets: {} valid: {}", num_packets, num_valid); + + let recycler = Recycler::default(); + let recycler_out = Recycler::default(); + // verify packets + bencher.iter(|| { + let _ans = + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, num_packets); }) } diff --git a/perf/src/packet.rs b/perf/src/packet.rs index d8c163a7af..0d258bc9d3 100644 --- a/perf/src/packet.rs +++ b/perf/src/packet.rs @@ -82,16 +82,16 @@ impl PacketBatch { } pub fn to_packet_batches(xs: &[T], chunks: usize) -> Vec { - let mut out = vec![]; - for x in xs.chunks(chunks) { - let mut batch = PacketBatch::with_capacity(x.len()); - batch.packets.resize(x.len(), Packet::default()); - for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { - Packet::populate_packet(packet, None, i).expect("serialize request"); - } - out.push(batch); - } - out + xs.chunks(chunks) + .map(|x| { + let mut batch = PacketBatch::with_capacity(x.len()); + batch.packets.resize(x.len(), Packet::default()); + for (i, packet) in x.iter().zip(batch.packets.iter_mut()) { + Packet::populate_packet(packet, None, i).expect("serialize request"); + } + batch + }) + .collect() } #[cfg(test)] diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index 61a8a9a938..3f1fd3f713 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -355,41 +355,45 @@ pub fn generate_offsets( let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets"); msg_sizes.set_pinnable(); let mut current_offset: usize = 0; - let mut v_sig_lens = Vec::new(); - batches.iter_mut().for_each(|batch| { - let mut sig_lens = Vec::new(); - batch.packets.iter_mut().for_each(|packet| { - let packet_offsets = get_packet_offsets(packet, current_offset, reject_non_vote); + let offsets = batches + .iter_mut() + .map(|batch| { + batch + .packets + .iter_mut() + .map(|packet| { + let packet_offsets = + get_packet_offsets(packet, current_offset, reject_non_vote); - sig_lens.push(packet_offsets.sig_len); + trace!("pubkey_offset: {}", packet_offsets.pubkey_start); - trace!("pubkey_offset: {}", packet_offsets.pubkey_start); + let mut pubkey_offset = packet_offsets.pubkey_start; + let mut sig_offset = packet_offsets.sig_start; + let msg_size = current_offset.saturating_add(packet.meta.size) as u32; + for _ in 0..packet_offsets.sig_len { + signature_offsets.push(sig_offset); + sig_offset = sig_offset.saturating_add(size_of::() as u32); - let mut pubkey_offset = packet_offsets.pubkey_start; - let mut sig_offset = packet_offsets.sig_start; - let msg_size = current_offset.saturating_add(packet.meta.size) as u32; - for _ in 0..packet_offsets.sig_len { - signature_offsets.push(sig_offset); - sig_offset = sig_offset.saturating_add(size_of::() as u32); + pubkey_offsets.push(pubkey_offset); + pubkey_offset = pubkey_offset.saturating_add(size_of::() as u32); - pubkey_offsets.push(pubkey_offset); - pubkey_offset = pubkey_offset.saturating_add(size_of::() as u32); + msg_start_offsets.push(packet_offsets.msg_start); - msg_start_offsets.push(packet_offsets.msg_start); - - let msg_size = msg_size.saturating_sub(packet_offsets.msg_start); - msg_sizes.push(msg_size); - } - current_offset = current_offset.saturating_add(size_of::()); - }); - v_sig_lens.push(sig_lens); - }); + let msg_size = msg_size.saturating_sub(packet_offsets.msg_start); + msg_sizes.push(msg_size); + } + current_offset = current_offset.saturating_add(size_of::()); + packet_offsets.sig_len + }) + .collect() + }) + .collect(); ( signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, - v_sig_lens, + offsets, ) } @@ -462,9 +466,8 @@ impl Deduper { } } -pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { +pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool, packet_count: usize) { use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); debug!("CPU ECDSA for {}", packet_count); PAR_THREAD_POOL.install(|| { batches.into_par_iter().for_each(|batch| { @@ -542,11 +545,21 @@ pub fn get_checked_scalar(scalar: &[u8; 32]) -> Result<[u8; 32], PacketError> { } pub fn mark_disabled(batches: &mut [PacketBatch], r: &[Vec]) { +<<<<<<< HEAD batches.iter_mut().zip(r).for_each(|(b, v)| { b.packets.iter_mut().zip(v).for_each(|(p, f)| { p.meta.discard = *f == 0; }) }); +======= + for (batch, v) in batches.iter_mut().zip(r) { + for (pkt, f) in batch.packets.iter_mut().zip(v) { + if !pkt.meta.discard() { + pkt.meta.set_discard(*f == 0); + } + } + } +>>>>>>> 2e56c59bc (Handle already discarded packets in gpu sigverify path (#22680)) } pub fn ed25519_verify( @@ -554,29 +567,35 @@ pub fn ed25519_verify( recycler: &Recycler, recycler_out: &Recycler>, reject_non_vote: bool, + valid_packet_count: usize, ) { let api = perf_libs::api(); if api.is_none() { - return ed25519_verify_cpu(batches, reject_non_vote); + return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count); } let api = api.unwrap(); use crate::packet::PACKET_DATA_SIZE; - let packet_count = count_packets_in_batches(batches); + let total_packet_count = count_packets_in_batches(batches); // micro-benchmarks show GPU time for smallest batch around 15-20ms // and CPU speed for 64-128 sigverifies around 10-20ms. 64 is a nice // power-of-two number around that accounting for the fact that the CPU // may be busy doing other things while being a real validator // TODO: dynamically adjust this crossover - if packet_count < 64 { - return ed25519_verify_cpu(batches, reject_non_vote); + if valid_packet_count < 64 + || 100usize + .wrapping_mul(valid_packet_count) + .wrapping_div(total_packet_count) + < 90 + { + return ed25519_verify_cpu(batches, reject_non_vote, valid_packet_count); } let (signature_offsets, pubkey_offsets, msg_start_offsets, msg_sizes, sig_lens) = generate_offsets(batches, recycler, reject_non_vote); - debug!("CUDA ECDSA for {}", packet_count); + debug!("CUDA ECDSA for {}", valid_packet_count); debug!("allocating out.."); let mut out = recycler_out.allocate("out_buffer"); out.set_pinnable(); @@ -589,8 +608,7 @@ pub fn ed25519_verify( elems: batch.packets.as_ptr(), num: batch.packets.len() as u32, }); - let mut v = Vec::new(); - v.resize(batch.packets.len(), 0); + let v = vec![0u8; batch.packets.len()]; rvs.push(v); num_packets = num_packets.saturating_add(batch.packets.len()); } @@ -621,7 +639,7 @@ pub fn ed25519_verify( trace!("done verify"); copy_return_values(&sig_lens, &out, &mut rvs); mark_disabled(batches, &rvs); - inc_new_counter_debug!("ed25519_verify_gpu", packet_count); + inc_new_counter_debug!("ed25519_verify_gpu", valid_packet_count); } #[cfg(test)] @@ -673,7 +691,12 @@ mod tests { batch.packets.push(Packet::default()); let mut batches: Vec = vec![batch]; mark_disabled(&mut batches, &[vec![0]]); +<<<<<<< HEAD assert!(batches[0].packets[0].meta.discard); +======= + assert!(batches[0].packets[0].meta.discard()); + batches[0].packets[0].meta.set_discard(false); +>>>>>>> 2e56c59bc (Handle already discarded packets in gpu sigverify path (#22680)) mark_disabled(&mut batches, &[vec![1]]); assert!(!batches[0].packets[0].meta.discard); } @@ -954,6 +977,29 @@ mod tests { ); } + fn generate_packet_batches_random_size( + packet: &Packet, + max_packets_per_batch: usize, + num_batches: usize, + ) -> Vec { + // generate packet vector + let batches: Vec<_> = (0..num_batches) + .map(|_| { + let mut packet_batch = PacketBatch::default(); + packet_batch.packets.resize(0, Packet::default()); + let num_packets_per_batch = thread_rng().gen_range(1, max_packets_per_batch); + for _ in 0..num_packets_per_batch { + packet_batch.packets.push(packet.clone()); + } + assert_eq!(packet_batch.packets.len(), num_packets_per_batch); + packet_batch + }) + .collect(); + assert_eq!(batches.len(), num_batches); + + batches + } + fn generate_packet_batches( packet: &Packet, num_packets_per_batch: usize, @@ -1001,7 +1047,8 @@ mod tests { fn ed25519_verify(batches: &mut [PacketBatch]) { let recycler = Recycler::default(); let recycler_out = Recycler::default(); - sigverify::ed25519_verify(batches, &recycler, &recycler_out, false); + let packet_count = sigverify::count_packets_in_batches(batches); + sigverify::ed25519_verify(batches, &recycler, &recycler_out, false, packet_count); } #[test] @@ -1082,9 +1129,8 @@ mod tests { let recycler = Recycler::default(); let recycler_out = Recycler::default(); for _ in 0..50 { - let n = thread_rng().gen_range(1, 30); let num_batches = thread_rng().gen_range(2, 30); - let mut batches = generate_packet_batches(&packet, n, num_batches); + let mut batches = generate_packet_batches_random_size(&packet, 128, num_batches); let num_modifications = thread_rng().gen_range(0, 5); for _ in 0..num_modifications { @@ -1096,11 +1142,17 @@ mod tests { batches[batch].packets[packet].data[offset].wrapping_add(add); } + let batch_to_disable = thread_rng().gen_range(0, batches.len()); + for p in batches[batch_to_disable].packets.iter_mut() { + p.meta.set_discard(true); + } + // verify from GPU verification pipeline (when GPU verification is enabled) are // equivalent to the CPU verification pipeline. let mut batches_cpu = batches.clone(); - sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false); - ed25519_verify_cpu(&mut batches_cpu, false); + let packet_count = sigverify::count_packets_in_batches(&batches); + sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out, false, packet_count); + ed25519_verify_cpu(&mut batches_cpu, false, packet_count); // check result batches