Name sigverify threads
This commit is contained in:
committed by
sakridge
parent
1f35779821
commit
92b5e131fe
@ -10,13 +10,12 @@ use crate::result::{Error, Result};
|
|||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::sigverify;
|
use crate::sigverify;
|
||||||
use crate::streamer::{self, PacketReceiver};
|
use crate::streamer::{self, PacketReceiver};
|
||||||
use rand::{thread_rng, Rng};
|
|
||||||
use solana_metrics::counter::Counter;
|
use solana_metrics::counter::Counter;
|
||||||
use solana_metrics::{influxdb, submit};
|
use solana_metrics::{influxdb, submit};
|
||||||
use solana_sdk::timing;
|
use solana_sdk::timing;
|
||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::{self, spawn, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
pub type VerifiedPackets = Vec<(SharedPackets, Vec<u8>)>;
|
pub type VerifiedPackets = Vec<(SharedPackets, Vec<u8>)>;
|
||||||
@ -51,6 +50,7 @@ impl SigVerifyStage {
|
|||||||
recvr: &Arc<Mutex<PacketReceiver>>,
|
recvr: &Arc<Mutex<PacketReceiver>>,
|
||||||
sendr: &Sender<VerifiedPackets>,
|
sendr: &Sender<VerifiedPackets>,
|
||||||
sigverify_disabled: bool,
|
sigverify_disabled: bool,
|
||||||
|
id: usize,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (batch, len, recv_time) =
|
let (batch, len, recv_time) =
|
||||||
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
|
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
|
||||||
@ -58,12 +58,11 @@ impl SigVerifyStage {
|
|||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let batch_len = batch.len();
|
let batch_len = batch.len();
|
||||||
let rand_id = thread_rng().gen_range(0, 100);
|
debug!(
|
||||||
info!(
|
|
||||||
"@{:?} verifier: verifying: {} id: {}",
|
"@{:?} verifier: verifying: {} id: {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
batch.len(),
|
batch.len(),
|
||||||
rand_id
|
id
|
||||||
);
|
);
|
||||||
|
|
||||||
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
|
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
|
||||||
@ -79,12 +78,12 @@ impl SigVerifyStage {
|
|||||||
"sigverify_stage-time_ms",
|
"sigverify_stage-time_ms",
|
||||||
(total_time_ms + recv_time) as usize
|
(total_time_ms + recv_time) as usize
|
||||||
);
|
);
|
||||||
info!(
|
debug!(
|
||||||
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
|
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
batch_len,
|
batch_len,
|
||||||
total_time_ms,
|
total_time_ms,
|
||||||
rand_id,
|
id,
|
||||||
len,
|
len,
|
||||||
(len as f32 / total_time_s)
|
(len as f32 / total_time_s)
|
||||||
);
|
);
|
||||||
@ -107,19 +106,25 @@ impl SigVerifyStage {
|
|||||||
packet_receiver: Arc<Mutex<PacketReceiver>>,
|
packet_receiver: Arc<Mutex<PacketReceiver>>,
|
||||||
verified_sender: Sender<VerifiedPackets>,
|
verified_sender: Sender<VerifiedPackets>,
|
||||||
sigverify_disabled: bool,
|
sigverify_disabled: bool,
|
||||||
|
id: usize,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
Builder::new()
|
||||||
if let Err(e) = Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled) {
|
.name(format!("solana-verifier-{}", id))
|
||||||
match e {
|
.spawn(move || loop {
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
if let Err(e) =
|
||||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
Self::verifier(&packet_receiver, &verified_sender, sigverify_disabled, id)
|
||||||
Error::SendError => {
|
{
|
||||||
break;
|
match e {
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||||
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||||
|
Error::SendError => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => error!("{:?}", e),
|
||||||
}
|
}
|
||||||
_ => error!("{:?}", e),
|
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
})
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verifier_services(
|
fn verifier_services(
|
||||||
@ -129,11 +134,12 @@ impl SigVerifyStage {
|
|||||||
) -> Vec<JoinHandle<()>> {
|
) -> Vec<JoinHandle<()>> {
|
||||||
let receiver = Arc::new(Mutex::new(packet_receiver));
|
let receiver = Arc::new(Mutex::new(packet_receiver));
|
||||||
(0..4)
|
(0..4)
|
||||||
.map(|_| {
|
.map(|id| {
|
||||||
Self::verifier_service(
|
Self::verifier_service(
|
||||||
receiver.clone(),
|
receiver.clone(),
|
||||||
verified_sender.clone(),
|
verified_sender.clone(),
|
||||||
sigverify_disabled,
|
sigverify_disabled,
|
||||||
|
id,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
|
Reference in New Issue
Block a user