Remove redundant threadpools in sigverify (bp #7888) (#7890)

automerge
This commit is contained in:
mergify[bot]
2020-01-20 21:31:56 -08:00
committed by Grimes
parent 3509f1158f
commit 35e7b2f975
7 changed files with 82 additions and 82 deletions

View File

@ -58,6 +58,7 @@ pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb";
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_{}", ix))
.build() .build()
.unwrap())); .unwrap()));

View File

@ -38,6 +38,7 @@ use thiserror::Error;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_processor_{}", ix))
.build() .build()
.unwrap()) .unwrap())
); );

View File

@ -27,6 +27,7 @@ use std::time::Instant;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("entry_{}", ix))
.build() .build()
.unwrap())); .unwrap()));

View File

@ -25,3 +25,6 @@ extern crate solana_metrics;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate lazy_static;

View File

@ -40,6 +40,7 @@ pub const SIZE_OF_DATA_SHRED_PAYLOAD: usize = PACKET_DATA_SIZE
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count()) .num_threads(get_thread_count())
.thread_name(|ix| format!("shredder_{}", ix))
.build() .build()
.unwrap())); .unwrap()));

View File

@ -20,15 +20,17 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc; use std::sync::Arc;
use std::{cell::RefCell, collections::HashMap, mem::size_of}; use std::{collections::HashMap, mem::size_of};
pub const SIGN_SHRED_GPU_MIN: usize = 256; pub const SIGN_SHRED_GPU_MIN: usize = 256;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() lazy_static! {
.num_threads(get_thread_count()) pub static ref SIGVERIFY_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.thread_name(|ix| format!("sigverify_shreds_{}", ix)) .num_threads(get_thread_count())
.build() .thread_name(|ix| format!("sigverify_shreds_{}", ix))
.unwrap())); .build()
.unwrap();
}
/// Assuming layout is /// Assuming layout is
/// signature: Signature /// signature: Signature
@ -67,18 +69,16 @@ fn verify_shreds_cpu(batches: &[Packets], slot_leaders: &HashMap<u64, [u8; 32]>)
use rayon::prelude::*; use rayon::prelude::*;
let count = batch_size(batches); let count = batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count); debug!("CPU SHRED ECDSA for {}", count);
let rv = PAR_THREAD_POOL.with(|thread_pool| { let rv = SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.borrow().install(|| { batches
batches .into_par_iter()
.into_par_iter() .map(|p| {
.map(|p| { p.packets
p.packets .par_iter()
.par_iter() .map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0))
.map(|p| verify_shred_cpu(p, slot_leaders).unwrap_or(0)) .collect()
.collect() })
}) .collect()
.collect()
})
}); });
inc_new_counter_debug!("ed25519_shred_verify_cpu", count); inc_new_counter_debug!("ed25519_shred_verify_cpu", count);
rv rv
@ -94,30 +94,28 @@ fn slot_key_data_for_gpu<
) -> (PinnedVec<u8>, TxOffset, usize) { ) -> (PinnedVec<u8>, TxOffset, usize) {
//TODO: mark Pubkey::default shreds as failed after the GPU returns //TODO: mark Pubkey::default shreds as failed after the GPU returns
assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default())); assert_eq!(slot_keys.get(&std::u64::MAX), Some(&T::default()));
let slots: Vec<Vec<u64>> = PAR_THREAD_POOL.with(|thread_pool| { let slots: Vec<Vec<u64>> = SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.borrow().install(|| { batches
batches .into_par_iter()
.into_par_iter() .map(|p| {
.map(|p| { p.packets
p.packets .iter()
.iter() .map(|packet| {
.map(|packet| { let slot_start = size_of::<Signature>() + size_of::<ShredType>();
let slot_start = size_of::<Signature>() + size_of::<ShredType>(); let slot_end = slot_start + size_of::<u64>();
let slot_end = slot_start + size_of::<u64>(); if packet.meta.size < slot_end {
if packet.meta.size < slot_end { return std::u64::MAX;
return std::u64::MAX; }
} let slot: Option<u64> =
let slot: Option<u64> = limited_deserialize(&packet.data[slot_start..slot_end]).ok();
limited_deserialize(&packet.data[slot_start..slot_end]).ok(); match slot {
match slot { Some(slot) if slot_keys.get(&slot).is_some() => slot,
Some(slot) if slot_keys.get(&slot).is_some() => slot, _ => std::u64::MAX,
_ => std::u64::MAX, }
} })
}) .collect()
.collect() })
}) .collect()
.collect()
})
}); });
let mut keys_to_slots: HashMap<T, Vec<u64>> = HashMap::new(); let mut keys_to_slots: HashMap<T, Vec<u64>> = HashMap::new();
for batch in slots.iter() { for batch in slots.iter() {
@ -309,14 +307,12 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
use rayon::prelude::*; use rayon::prelude::*;
let count = batch_size(batches); let count = batch_size(batches);
debug!("CPU SHRED ECDSA for {}", count); debug!("CPU SHRED ECDSA for {}", count);
PAR_THREAD_POOL.with(|thread_pool| { SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.borrow().install(|| { batches.par_iter_mut().for_each(|p| {
batches.par_iter_mut().for_each(|p| { p.packets[..]
p.packets[..] .par_iter_mut()
.par_iter_mut() .for_each(|mut p| sign_shred_cpu(keypair, &mut p));
.for_each(|mut p| sign_shred_cpu(keypair, &mut p)); });
});
})
}); });
inc_new_counter_debug!("ed25519_shred_verify_cpu", count); inc_new_counter_debug!("ed25519_shred_verify_cpu", count);
} }
@ -422,25 +418,23 @@ pub fn sign_shreds_gpu(
} }
sizes[i] += sizes[i - 1]; sizes[i] += sizes[i - 1];
} }
PAR_THREAD_POOL.with(|thread_pool| { SIGVERIFY_THREAD_POOL.install(|| {
thread_pool.borrow().install(|| { batches
batches .par_iter_mut()
.par_iter_mut() .enumerate()
.enumerate() .for_each(|(batch_ix, batch)| {
.for_each(|(batch_ix, batch)| { let num_packets = sizes[batch_ix];
let num_packets = sizes[batch_ix]; batch.packets[..]
batch.packets[..] .par_iter_mut()
.par_iter_mut() .enumerate()
.enumerate() .for_each(|(packet_ix, packet)| {
.for_each(|(packet_ix, packet)| { let sig_ix = packet_ix + num_packets;
let sig_ix = packet_ix + num_packets; let sig_start = sig_ix * sig_size;
let sig_start = sig_ix * sig_size; let sig_end = sig_start + sig_size;
let sig_end = sig_start + sig_size; packet.data[0..sig_size]
packet.data[0..sig_size] .copy_from_slice(&signatures_out[sig_start..sig_end]);
.copy_from_slice(&signatures_out[sig_start..sig_end]); });
}); });
});
});
}); });
inc_new_counter_debug!("ed25519_shred_sign_gpu", count); inc_new_counter_debug!("ed25519_shred_sign_gpu", count);
} }

View File

@ -18,14 +18,15 @@ use solana_sdk::short_vec::decode_len;
use solana_sdk::signature::Signature; use solana_sdk::signature::Signature;
#[cfg(test)] #[cfg(test)]
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::cell::RefCell;
use std::mem::size_of; use std::mem::size_of;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new() lazy_static! {
.num_threads(get_thread_count()) static ref PAR_THREAD_POOL: ThreadPool = rayon::ThreadPoolBuilder::new()
.thread_name(|ix| format!("sigverify_{}", ix)) .num_threads(get_thread_count())
.build() .thread_name(|ix| format!("sigverify_{}", ix))
.unwrap())); .build()
.unwrap();
}
pub type TxOffset = PinnedVec<u32>; pub type TxOffset = PinnedVec<u32>;
@ -247,13 +248,11 @@ pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec<Vec<u8>> {
use rayon::prelude::*; use rayon::prelude::*;
let count = batch_size(batches); let count = batch_size(batches);
debug!("CPU ECDSA for {}", batch_size(batches)); debug!("CPU ECDSA for {}", batch_size(batches));
let rv = PAR_THREAD_POOL.with(|thread_pool| { let rv = PAR_THREAD_POOL.install(|| {
thread_pool.borrow().install(|| { batches
batches .into_par_iter()
.into_par_iter() .map(|p| p.packets.par_iter().map(verify_packet).collect())
.map(|p| p.packets.par_iter().map(verify_packet).collect()) .collect()
.collect()
})
}); });
inc_new_counter_debug!("ed25519_verify_cpu", count); inc_new_counter_debug!("ed25519_verify_cpu", count);
rv rv