Use smaller batch size in sigverify stage if CUDA is not available (#3951)
This commit is contained in:
@ -18,6 +18,12 @@ use std::sync::{Arc, Mutex};
|
|||||||
use std::thread::{self, Builder, JoinHandle};
|
use std::thread::{self, Builder, JoinHandle};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
#[cfg(feature = "cuda")]
|
||||||
|
const RECV_BATCH_MAX: usize = 60_000;
|
||||||
|
|
||||||
|
#[cfg(not(feature = "cuda"))]
|
||||||
|
const RECV_BATCH_MAX: usize = 1000;
|
||||||
|
|
||||||
pub type VerifiedPackets = Vec<(Packets, Vec<u8>)>;
|
pub type VerifiedPackets = Vec<(Packets, Vec<u8>)>;
|
||||||
|
|
||||||
pub struct SigVerifyStage {
|
pub struct SigVerifyStage {
|
||||||
@ -52,8 +58,10 @@ impl SigVerifyStage {
|
|||||||
sigverify_disabled: bool,
|
sigverify_disabled: bool,
|
||||||
id: usize,
|
id: usize,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (batch, len, recv_time) =
|
let (batch, len, recv_time) = streamer::recv_batch(
|
||||||
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
|
&recvr.lock().expect("'recvr' lock in fn verifier"),
|
||||||
|
RECV_BATCH_MAX,
|
||||||
|
)?;
|
||||||
inc_new_counter_info!("sigverify_stage-packets_received", len);
|
inc_new_counter_info!("sigverify_stage-packets_received", len);
|
||||||
|
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
@ -19,8 +19,6 @@ pub type PacketSender = Sender<Packets>;
|
|||||||
pub type BlobSender = Sender<SharedBlobs>;
|
pub type BlobSender = Sender<SharedBlobs>;
|
||||||
pub type BlobReceiver = Receiver<SharedBlobs>;
|
pub type BlobReceiver = Receiver<SharedBlobs>;
|
||||||
|
|
||||||
const RECV_BATCH_MAX: usize = 60_000;
|
|
||||||
|
|
||||||
fn recv_loop(sock: &UdpSocket, exit: Arc<AtomicBool>, channel: &PacketSender) -> Result<()> {
|
fn recv_loop(sock: &UdpSocket, exit: Arc<AtomicBool>, channel: &PacketSender) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let mut msgs = Packets::default();
|
let mut msgs = Packets::default();
|
||||||
@ -63,7 +61,7 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, u64)> {
|
pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec<Packets>, usize, u64)> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = recvr.recv_timeout(timer)?;
|
let msgs = recvr.recv_timeout(timer)?;
|
||||||
let recv_start = Instant::now();
|
let recv_start = Instant::now();
|
||||||
@ -75,7 +73,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<Packets>, usize, u64)>
|
|||||||
len += more.packets.len();
|
len += more.packets.len();
|
||||||
batch.push(more);
|
batch.push(more);
|
||||||
|
|
||||||
if len > RECV_BATCH_MAX {
|
if len > max_batch {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user