Thread pool for par_iter in EntrySlice::verify (#4732)

* Use thread pool for entry verify par iter

* some performance metrics

* check batch size and use CPU for smaller batches
This commit is contained in:
Pankaj Garg
2019-06-19 16:31:32 -07:00
committed by GitHub
parent 2e2b1881f5
commit 9800e09431

View File

@@ -8,20 +8,32 @@ use crate::result::Result;
use bincode::{deserialize, serialized_size};
use chrono::prelude::Utc;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_budget_api::budget_instruction;
use solana_metrics::inc_new_counter_warn;
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::cell::RefCell;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
#[cfg(feature = "cuda")]
use crate::sigverify::poh_verify_many;
use solana_sdk::timing;
#[cfg(feature = "cuda")]
use std::sync::Mutex;
#[cfg(feature = "cuda")]
use std::thread;
use std::time::Instant;
pub const NUM_THREADS: u32 = 10;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()
.unwrap()));
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
@@ -211,6 +223,7 @@ where
// an EntrySlice is a slice of Entries
pub trait EntrySlice {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify_cpu(&self, start_hash: &Hash) -> bool;
fn verify(&self, start_hash: &Hash) -> bool;
fn to_shared_blobs(&self) -> Vec<SharedBlob>;
fn to_blobs(&self) -> Vec<Blob>;
@@ -219,14 +232,16 @@ pub trait EntrySlice {
}
impl EntrySlice for [Entry] {
#[cfg(not(feature = "cuda"))]
fn verify(&self, start_hash: &Hash) -> bool {
fn verify_cpu(&self, start_hash: &Hash) -> bool {
let now = Instant::now();
let genesis = [Entry {
num_hashes: 0,
hash: *start_hash,
transactions: vec![],
}];
let entry_pairs = genesis.par_iter().chain(self).zip(self);
let res = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
entry_pairs.all(|(x0, x1)| {
let r = x1.verify(&x0.hash);
if !r {
@@ -239,10 +254,31 @@ impl EntrySlice for [Entry] {
}
r
})
})
});
inc_new_counter_warn!(
"entry_verify-duration",
timing::duration_as_ms(&now.elapsed()) as usize
);
res
}
#[cfg(not(feature = "cuda"))]
fn verify(&self, start_hash: &Hash) -> bool {
self.verify_cpu(start_hash)
}
#[cfg(feature = "cuda")]
fn verify(&self, start_hash: &Hash) -> bool {
inc_new_counter_warn!("entry_verify-num_entries", self.len() as usize);
// Use CPU verify if the batch length is < 1K
if self.len() < 1024 {
return self.verify_cpu(start_hash);
}
let start = Instant::now();
let genesis = [Entry {
num_hashes: 0,
hash: *start_hash,
@@ -250,7 +286,7 @@ impl EntrySlice for [Entry] {
}];
let hashes: Vec<Hash> = genesis
.par_iter()
.iter()
.chain(self)
.map(|entry| entry.hash)
.take(self.len())
@@ -265,6 +301,7 @@ impl EntrySlice for [Entry] {
let hashes = Arc::new(Mutex::new(hashes));
let hashes_clone = hashes.clone();
let gpu_wait = Instant::now();
let gpu_verify_thread = thread::spawn(move || {
let mut hashes = hashes_clone.lock().unwrap();
let res;
@@ -281,8 +318,9 @@ impl EntrySlice for [Entry] {
}
});
let tx_hashes: Vec<Option<Hash>> = self
.into_par_iter()
let tx_hashes: Vec<Option<Hash>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
self.into_par_iter()
.map(|entry| {
if entry.transactions.is_empty() {
None
@@ -290,16 +328,22 @@ impl EntrySlice for [Entry] {
Some(hash_transactions(&entry.transactions))
}
})
.collect();
.collect()
})
});
gpu_verify_thread.join().unwrap();
inc_new_counter_warn!(
"entry_verify-gpu_thread",
timing::duration_as_ms(&gpu_wait.elapsed()) as usize
);
let hashes = Arc::try_unwrap(hashes).unwrap().into_inner().unwrap();
hashes
.into_par_iter()
.zip(tx_hashes)
.zip(self)
.all(|((hash, tx_hash), answer)| {
let res =
PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
hashes.into_par_iter().zip(tx_hashes).zip(self).all(
|((hash, tx_hash), answer)| {
if answer.num_hashes == 0 {
hash == answer.hash
} else {
@@ -310,7 +354,15 @@ impl EntrySlice for [Entry] {
poh.tick().unwrap().hash == answer.hash
}
}
},
)
})
});
inc_new_counter_warn!(
"entry_verify-duration",
timing::duration_as_ms(&start.elapsed()) as usize
);
res
}
fn to_blobs(&self) -> Vec<Blob> {