Use thread pools for rayon par_iter (#4473)
* Use thread pools for rayon par_iter * address review comments * cleanup
This commit is contained in:
@ -3,6 +3,7 @@ use crate::blocktree::Blocktree;
|
||||
use crate::entry::{Entry, EntrySlice};
|
||||
use crate::leader_schedule_cache::LeaderScheduleCache;
|
||||
use rayon::prelude::*;
|
||||
use rayon::ThreadPool;
|
||||
use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_runtime::locked_accounts_results::LockedAccountsResults;
|
||||
@ -15,6 +16,14 @@ use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
use std::cell::RefCell;
|
||||
|
||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
|
||||
.build()
|
||||
.unwrap()));
|
||||
|
||||
fn first_err(results: &[Result<()>]) -> Result<()> {
|
||||
for r in results {
|
||||
if r.is_err() {
|
||||
@ -29,32 +38,36 @@ fn par_execute_entries(
|
||||
entries: &[(&Entry, LockedAccountsResults<Transaction>)],
|
||||
) -> Result<()> {
|
||||
inc_new_counter_debug!("bank-par_execute_entries-count", entries.len());
|
||||
let results: Vec<Result<()>> = entries
|
||||
.into_par_iter()
|
||||
.map(|(e, locked_accounts)| {
|
||||
let results = bank.load_execute_and_commit_transactions(
|
||||
&e.transactions,
|
||||
locked_accounts,
|
||||
MAX_RECENT_BLOCKHASHES,
|
||||
);
|
||||
let mut first_err = None;
|
||||
for (r, tx) in results.iter().zip(e.transactions.iter()) {
|
||||
if let Err(ref e) = r {
|
||||
if first_err.is_none() {
|
||||
first_err = Some(r.clone());
|
||||
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
entries
|
||||
.into_par_iter()
|
||||
.map(|(e, locked_accounts)| {
|
||||
let results = bank.load_execute_and_commit_transactions(
|
||||
&e.transactions,
|
||||
locked_accounts,
|
||||
MAX_RECENT_BLOCKHASHES,
|
||||
);
|
||||
let mut first_err = None;
|
||||
for (r, tx) in results.iter().zip(e.transactions.iter()) {
|
||||
if let Err(ref e) = r {
|
||||
if first_err.is_none() {
|
||||
first_err = Some(r.clone());
|
||||
}
|
||||
if !Bank::can_commit(&r) {
|
||||
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
|
||||
datapoint_error!(
|
||||
"validator_process_entry_error",
|
||||
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
if !Bank::can_commit(&r) {
|
||||
warn!("Unexpected validator error: {:?}, tx: {:?}", e, tx);
|
||||
datapoint_error!(
|
||||
"validator_process_entry_error",
|
||||
("error", format!("error: {:?}, tx: {:?}", e, tx), String)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
first_err.unwrap_or(Ok(()))
|
||||
first_err.unwrap_or(Ok(()))
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect();
|
||||
});
|
||||
|
||||
first_err(&results)
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::staking_utils;
|
||||
use rayon::prelude::*;
|
||||
use rayon::ThreadPool;
|
||||
use solana_metrics::{
|
||||
datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info,
|
||||
inc_new_counter_warn,
|
||||
@ -24,6 +25,8 @@ use std::sync::{Arc, RwLock};
|
||||
use std::thread::{self, Builder, JoinHandle};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum BroadcastStageReturnType {
|
||||
ChannelDisconnected,
|
||||
@ -40,6 +43,7 @@ struct Broadcast {
|
||||
id: Pubkey,
|
||||
coding_generator: CodingGenerator,
|
||||
stats: BroadcastStats,
|
||||
thread_pool: ThreadPool,
|
||||
}
|
||||
|
||||
impl Broadcast {
|
||||
@ -96,14 +100,16 @@ impl Broadcast {
|
||||
|
||||
let to_blobs_start = Instant::now();
|
||||
|
||||
let blobs: Vec<_> = ventries
|
||||
.into_par_iter()
|
||||
.map(|p| {
|
||||
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
|
||||
entries.to_shared_blobs()
|
||||
})
|
||||
.flatten()
|
||||
.collect();
|
||||
let blobs: Vec<_> = self.thread_pool.install(|| {
|
||||
ventries
|
||||
.into_par_iter()
|
||||
.map(|p| {
|
||||
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
|
||||
entries.to_shared_blobs()
|
||||
})
|
||||
.flatten()
|
||||
.collect()
|
||||
});
|
||||
|
||||
let blob_index = blocktree
|
||||
.meta(bank.slot())
|
||||
@ -218,6 +224,10 @@ impl BroadcastStage {
|
||||
id: me.id,
|
||||
coding_generator,
|
||||
stats: BroadcastStats::default(),
|
||||
thread_pool: rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
|
||||
.build()
|
||||
.unwrap(),
|
||||
};
|
||||
|
||||
loop {
|
||||
|
@ -7,6 +7,7 @@
|
||||
use crate::packet::{Packet, Packets};
|
||||
use crate::result::Result;
|
||||
use bincode::serialized_size;
|
||||
use rayon::ThreadPool;
|
||||
use solana_metrics::inc_new_counter_debug;
|
||||
use solana_sdk::message::MessageHeader;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
@ -16,6 +17,14 @@ use solana_sdk::signature::Signature;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use std::mem::size_of;
|
||||
|
||||
pub const NUM_THREADS: u32 = 10;
|
||||
use std::cell::RefCell;
|
||||
|
||||
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
|
||||
.build()
|
||||
.unwrap()));
|
||||
|
||||
type TxOffsets = (Vec<u32>, Vec<u32>, Vec<u32>, Vec<u32>, Vec<Vec<u32>>);
|
||||
|
||||
#[cfg(feature = "cuda")]
|
||||
@ -174,10 +183,14 @@ pub fn ed25519_verify_cpu(batches: &[Packets]) -> Vec<Vec<u8>> {
|
||||
use rayon::prelude::*;
|
||||
let count = batch_size(batches);
|
||||
debug!("CPU ECDSA for {}", batch_size(batches));
|
||||
let rv = batches
|
||||
.into_par_iter()
|
||||
.map(|p| p.packets.par_iter().map(verify_packet).collect())
|
||||
.collect();
|
||||
let rv = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
batches
|
||||
.into_par_iter()
|
||||
.map(|p| p.packets.par_iter().map(verify_packet).collect())
|
||||
.collect()
|
||||
})
|
||||
});
|
||||
inc_new_counter_debug!("ed25519_verify_cpu", count);
|
||||
rv
|
||||
}
|
||||
|
Reference in New Issue
Block a user