diff --git a/core/src/shred.rs b/core/src/shred.rs index 4c034d7cdf..ea268ade25 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -3,8 +3,12 @@ use crate::erasure::Session; use crate::result; use crate::result::Error; use bincode::serialized_size; +use core::cell::RefCell; use lazy_static::lazy_static; +use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; +use rayon::ThreadPool; use serde::{Deserialize, Serialize}; +use solana_rayon_threadlimit::get_thread_count; use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; @@ -28,6 +32,11 @@ lazy_static! { static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize }; } +thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(get_thread_count()) + .build() + .unwrap())); + /// The constants that define if a shred is data or coding pub const DATA_SHRED: u8 = 0b1010_0101; pub const CODING_SHRED: u8 = 0b0101_1010; @@ -432,16 +441,25 @@ impl Shredder { fn sign_unsigned_shreds_and_generate_codes(&mut self) { let signature_offset = CodingShred::overhead(); let signer = self.signer.clone(); - self.shreds[self.fec_set_shred_start..] - .iter_mut() - .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + self.shreds[self.fec_set_shred_start..] + .par_iter_mut() + .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + }) + }); let unsigned_coding_shred_start = self.shreds.len(); self.generate_coding_shreds(); let signature_offset = *SIZE_OF_SHRED_TYPE; - self.shreds[unsigned_coding_shred_start..] - .iter_mut() - .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + PAR_THREAD_POOL.with(|thread_pool| { + thread_pool.borrow().install(|| { + self.shreds[unsigned_coding_shred_start..] + .par_iter_mut() + .for_each(|d| Self::sign_shred(&signer, d, signature_offset)); + }) + }); + self.fec_set_shred_start = self.shreds.len(); } diff --git a/rayon-threadlimit/src/lib.rs b/rayon-threadlimit/src/lib.rs index 341c462642..608ebc7f3e 100644 --- a/rayon-threadlimit/src/lib.rs +++ b/rayon-threadlimit/src/lib.rs @@ -4,9 +4,12 @@ extern crate lazy_static; use std::sync::RwLock; //TODO remove this hack when rayon fixes itself + lazy_static! { +// reduce the number of threads each pool is allowed to half the cpu core count, to avoid rayon +// hogging cpu static ref MAX_RAYON_THREADS: RwLock = - RwLock::new(sys_info::cpu_num().unwrap() as usize); + RwLock::new(sys_info::cpu_num().unwrap() as usize / 2); } pub fn get_thread_count() -> usize {