diff --git a/core/src/entry.rs b/core/src/entry.rs index 29e1f9c9e2..42d20b0b53 100644 --- a/core/src/entry.rs +++ b/core/src/entry.rs @@ -8,20 +8,32 @@ use crate::result::Result; use bincode::{deserialize, serialized_size}; use chrono::prelude::Utc; use rayon::prelude::*; +use rayon::ThreadPool; use solana_budget_api::budget_instruction; +use solana_metrics::inc_new_counter_warn; use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; use std::borrow::Borrow; +use std::cell::RefCell; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, RwLock}; #[cfg(feature = "cuda")] use crate::sigverify::poh_verify_many; +use solana_sdk::timing; #[cfg(feature = "cuda")] use std::sync::Mutex; #[cfg(feature = "cuda")] use std::thread; +use std::time::Instant; + +pub const NUM_THREADS: u32 = 10; + +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) + .build() + .unwrap())); pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -211,6 +223,7 @@ where // an EntrySlice is a slice of Entries pub trait EntrySlice { /// Verifies the hashes and counts of a slice of transactions are all consistent. + fn verify_cpu(&self, start_hash: &Hash) -> bool; fn verify(&self, start_hash: &Hash) -> bool; fn to_shared_blobs(&self) -> Vec; fn to_blobs(&self) -> Vec; @@ -219,30 +232,53 @@ pub trait EntrySlice { } impl EntrySlice for [Entry] { - #[cfg(not(feature = "cuda"))] - fn verify(&self, start_hash: &Hash) -> bool { + fn verify_cpu(&self, start_hash: &Hash) -> bool { + let now = Instant::now(); let genesis = [Entry { num_hashes: 0, hash: *start_hash, transactions: vec![], }]; let entry_pairs = genesis.par_iter().chain(self).zip(self); - entry_pairs.all(|(x0, x1)| { - let r = x1.verify(&x0.hash); - if !r { - warn!( - "entry invalid!: x0: {:?}, x1: {:?} num txs: {}", - x0.hash, - x1.hash, - x1.transactions.len() - ); - } - r - }) + let res = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + entry_pairs.all(|(x0, x1)| { + let r = x1.verify(&x0.hash); + if !r { + warn!( + "entry invalid!: x0: {:?}, x1: {:?} num txs: {}", + x0.hash, + x1.hash, + x1.transactions.len() + ); + } + r + }) + }) + }); + inc_new_counter_warn!( + "entry_verify-duration", + timing::duration_as_ms(&now.elapsed()) as usize + ); + res + } + + #[cfg(not(feature = "cuda"))] + fn verify(&self, start_hash: &Hash) -> bool { + self.verify_cpu(start_hash) } #[cfg(feature = "cuda")] fn verify(&self, start_hash: &Hash) -> bool { + inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize); + + // Use CPU verify if the batch length is < 1K + if self.len() < 1024 { + return self.verify_cpu(start_hash); + } + + let start = Instant::now(); + let genesis = [Entry { num_hashes: 0, hash: *start_hash, @@ -250,7 +286,7 @@ impl EntrySlice for [Entry] { }]; let hashes: Vec = genesis - .par_iter() + .iter() .chain(self) .map(|entry| entry.hash) .take(self.len()) @@ -265,6 +301,7 @@ impl EntrySlice for [Entry] { let hashes = Arc::new(Mutex::new(hashes)); let hashes_clone = hashes.clone(); + let gpu_wait = Instant::now(); let gpu_verify_thread = thread::spawn(move || { let mut hashes = hashes_clone.lock().unwrap(); let res; @@ -281,36 +318,51 @@ impl EntrySlice for [Entry] { } }); - let tx_hashes: Vec> = self - .into_par_iter() - .map(|entry| { - if entry.transactions.is_empty() { - None - } else { - Some(hash_transactions(&entry.transactions)) - } + let tx_hashes: Vec> = PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + self.into_par_iter() + .map(|entry| { + if entry.transactions.is_empty() { + None + } else { + Some(hash_transactions(&entry.transactions)) + } + }) + .collect() }) - .collect(); + }); gpu_verify_thread.join().unwrap(); + inc_new_counter_warn!( + "entry_verify-gpu_thread", + timing::duration_as_ms(&gpu_wait.elapsed()) as usize + ); let hashes = Arc::try_unwrap(hashes).unwrap().into_inner().unwrap(); - hashes - .into_par_iter() - .zip(tx_hashes) - .zip(self) - .all(|((hash, tx_hash), answer)| { - if answer.num_hashes == 0 { - hash == answer.hash - } else { - let mut poh = Poh::new(hash, None); - if let Some(mixin) = tx_hash { - poh.record(mixin).unwrap().hash == answer.hash - } else { - poh.tick().unwrap().hash == answer.hash - } - } - }) + let res = + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + hashes.into_par_iter().zip(tx_hashes).zip(self).all( + |((hash, tx_hash), answer)| { + if answer.num_hashes == 0 { + hash == answer.hash + } else { + let mut poh = Poh::new(hash, None); + if let Some(mixin) = tx_hash { + poh.record(mixin).unwrap().hash == answer.hash + } else { + poh.tick().unwrap().hash == answer.hash + } + } + }, + ) + }) + }); + inc_new_counter_warn!( + "entry_verify-duration", + timing::duration_as_ms(&start.elapsed()) as usize + ); + res } fn to_blobs(&self) -> Vec {