From f384a2ce85fff63c3770f1080efefda7cfb9067d Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 10 May 2018 15:47:42 -0600 Subject: [PATCH] Move streamer-specific utility into streamer module --- src/streamer.rs | 19 +++++++++++++++++++ src/tpu.rs | 21 +-------------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/streamer.rs b/src/streamer.rs index 1a607a12f0..13a8d04a28 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -64,6 +64,25 @@ fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Res Ok(()) } +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> { + let timer = Duration::new(1, 0); + let msgs = recvr.recv_timeout(timer)?; + debug!("got msgs"); + let mut len = msgs.read().unwrap().packets.len(); + let mut batch = vec![msgs]; + while let Ok(more) = recvr.try_recv() { + trace!("got more msgs"); + len += more.read().unwrap().packets.len(); + batch.push(more); + + if len > 100_000 { + break; + } + } + debug!("batch len {}", batch.len()); + Ok((batch, len)) +} + pub fn responder( sock: UdpSocket, exit: Arc, diff --git a/src/tpu.rs b/src/tpu.rs index 0badedf3d5..dbab29380e 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -124,25 +124,6 @@ impl Tpu { }) } - fn recv_batch(recvr: &streamer::PacketReceiver) -> Result<(Vec, usize)> { - let timer = Duration::new(1, 0); - let msgs = recvr.recv_timeout(timer)?; - debug!("got msgs"); - let mut len = msgs.read().unwrap().packets.len(); - let mut batch = vec![msgs]; - while let Ok(more) = recvr.try_recv() { - trace!("got more msgs"); - len += more.read().unwrap().packets.len(); - batch.push(more); - - if len > 100_000 { - break; - } - } - debug!("batch len {}", batch.len()); - Ok((batch, len)) - } - fn verify_batch( batch: Vec, sendr: &Arc)>>>>, @@ -158,7 +139,7 @@ impl Tpu { recvr: &Arc>, sendr: &Arc)>>>>, ) -> Result<()> { - let (batch, len) = Self::recv_batch(&recvr.lock().unwrap())?; + let (batch, len) = streamer::recv_batch(&recvr.lock().unwrap())?; let now = Instant::now(); let batch_len = batch.len(); let rand_id = thread_rng().gen_range(0, 100);