Add parallel shred signing to shredder (#5964)
This commit is contained in:
@ -3,8 +3,12 @@ use crate::erasure::Session;
|
|||||||
use crate::result;
|
use crate::result;
|
||||||
use crate::result::Error;
|
use crate::result::Error;
|
||||||
use bincode::serialized_size;
|
use bincode::serialized_size;
|
||||||
|
use core::cell::RefCell;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator};
|
||||||
|
use rayon::ThreadPool;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use solana_rayon_threadlimit::get_thread_count;
|
||||||
use solana_sdk::packet::PACKET_DATA_SIZE;
|
use solana_sdk::packet::PACKET_DATA_SIZE;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
|
||||||
@ -28,6 +32,11 @@ lazy_static! {
|
|||||||
static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
|
static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(get_thread_count())
|
||||||
|
.build()
|
||||||
|
.unwrap()));
|
||||||
|
|
||||||
/// The constants that define if a shred is data or coding
|
/// The constants that define if a shred is data or coding
|
||||||
pub const DATA_SHRED: u8 = 0b1010_0101;
|
pub const DATA_SHRED: u8 = 0b1010_0101;
|
||||||
pub const CODING_SHRED: u8 = 0b0101_1010;
|
pub const CODING_SHRED: u8 = 0b0101_1010;
|
||||||
@ -432,16 +441,25 @@ impl Shredder {
|
|||||||
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
|
fn sign_unsigned_shreds_and_generate_codes(&mut self) {
|
||||||
let signature_offset = CodingShred::overhead();
|
let signature_offset = CodingShred::overhead();
|
||||||
let signer = self.signer.clone();
|
let signer = self.signer.clone();
|
||||||
|
PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
|
thread_pool.borrow().install(|| {
|
||||||
self.shreds[self.fec_set_shred_start..]
|
self.shreds[self.fec_set_shred_start..]
|
||||||
.iter_mut()
|
.par_iter_mut()
|
||||||
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
||||||
|
})
|
||||||
|
});
|
||||||
let unsigned_coding_shred_start = self.shreds.len();
|
let unsigned_coding_shred_start = self.shreds.len();
|
||||||
|
|
||||||
self.generate_coding_shreds();
|
self.generate_coding_shreds();
|
||||||
let signature_offset = *SIZE_OF_SHRED_TYPE;
|
let signature_offset = *SIZE_OF_SHRED_TYPE;
|
||||||
|
PAR_THREAD_POOL.with(|thread_pool| {
|
||||||
|
thread_pool.borrow().install(|| {
|
||||||
self.shreds[unsigned_coding_shred_start..]
|
self.shreds[unsigned_coding_shred_start..]
|
||||||
.iter_mut()
|
.par_iter_mut()
|
||||||
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
.for_each(|d| Self::sign_shred(&signer, d, signature_offset));
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
self.fec_set_shred_start = self.shreds.len();
|
self.fec_set_shred_start = self.shreds.len();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,9 +4,12 @@ extern crate lazy_static;
|
|||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
|
|
||||||
//TODO remove this hack when rayon fixes itself
|
//TODO remove this hack when rayon fixes itself
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
|
// reduce the number of threads each pool is allowed to half the cpu core count, to avoid rayon
|
||||||
|
// hogging cpu
|
||||||
static ref MAX_RAYON_THREADS: RwLock<usize> =
|
static ref MAX_RAYON_THREADS: RwLock<usize> =
|
||||||
RwLock::new(sys_info::cpu_num().unwrap() as usize);
|
RwLock::new(sys_info::cpu_num().unwrap() as usize / 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_thread_count() -> usize {
|
pub fn get_thread_count() -> usize {
|
||||||
|
Reference in New Issue
Block a user