From d22a1c9b1ff35b5914050833c03b31663d3a0928 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Tue, 23 Apr 2019 12:41:50 -0700 Subject: [PATCH] Use smaller batch size in sigverify stage if CUDA is not available (#3951) --- core/src/sigverify_stage.rs | 12 ++++++++++-- core/src/streamer.rs | 6 ++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 550243f0da..abe6f21fd8 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -18,6 +18,12 @@ use std::sync::{Arc, Mutex}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; +#[cfg(feature = "cuda")] +const RECV_BATCH_MAX: usize = 60_000; + +#[cfg(not(feature = "cuda"))] +const RECV_BATCH_MAX: usize = 1000; + pub type VerifiedPackets = Vec<(Packets, Vec)>; pub struct SigVerifyStage { @@ -52,8 +58,10 @@ impl SigVerifyStage { sigverify_disabled: bool, id: usize, ) -> Result<()> { - let (batch, len, recv_time) = - streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + let (batch, len, recv_time) = streamer::recv_batch( + &recvr.lock().expect("'recvr' lock in fn verifier"), + RECV_BATCH_MAX, + )?; inc_new_counter_info!("sigverify_stage-packets_received", len); let now = Instant::now(); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 51a7f2e32e..e74a7a38ae 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -19,8 +19,6 @@ pub type PacketSender = Sender; pub type BlobSender = Sender; pub type BlobReceiver = Receiver; -const RECV_BATCH_MAX: usize = 60_000; - fn recv_loop(sock: &UdpSocket, exit: Arc, channel: &PacketSender) -> Result<()> { loop { let mut msgs = Packets::default(); @@ -63,7 +61,7 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> { Ok(()) } -pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> { +pub fn recv_batch(recvr: &PacketReceiver, max_batch: usize) -> Result<(Vec, usize, u64)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; let recv_start = Instant::now(); @@ -75,7 +73,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> len += more.packets.len(); batch.push(more); - if len > RECV_BATCH_MAX { + if len > max_batch { break; } }