diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 0514046961..b1d0120594 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -263,7 +263,7 @@ impl Replicator { .map(Arc::new) .collect(); let (blob_fetch_sender, blob_fetch_receiver) = channel(); - let fetch_stage = ShredFetchStage::new_multi_socket( + let fetch_stage = ShredFetchStage::new( blob_sockets, blob_forward_sockets, &blob_fetch_sender, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index bc4c9289c3..be03f36b7c 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,13 +1,11 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. use crate::recycler::Recycler; -use crate::result; -use crate::result::Error; use crate::service::Service; -use crate::streamer::{self, PacketReceiver, PacketSender}; +use crate::streamer::{self, PacketSender}; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{channel, RecvTimeoutError}; +use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; @@ -16,30 +14,7 @@ pub struct ShredFetchStage { } impl ShredFetchStage { - fn handle_forwarded_packets( - recvr: &PacketReceiver, - sendr: &PacketSender, - ) -> result::Result<()> { - let msgs = recvr.recv()?; - let mut batch = vec![msgs]; - while let Ok(more) = recvr.try_recv() { - batch.push(more); - } - - batch - .iter_mut() - .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.forward = true)); - - for packets in batch { - if sendr.send(packets).is_err() { - return Err(Error::SendError); - } - } - - Ok(()) - } - - pub fn new_multi_socket( + pub fn new( sockets: Vec>, forward_sockets: Vec>, sender: &PacketSender, @@ -70,14 +45,11 @@ impl ShredFetchStage { let sender = sender.clone(); let fwd_thread_hdl = Builder::new() .name("solana-tvu-fetch-stage-fwd-rcvr".to_string()) - .spawn(move || loop { - if let Err(e) = Self::handle_forwarded_packets(&forward_receiver, &sender) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::RecvError(_) => break, - Error::SendError => break, - _ => error!("{:?}", e), + .spawn(move || { + while let Some(mut p) = forward_receiver.iter().next() { + p.packets.iter_mut().for_each(|p| p.meta.forward = true); + if sender.send(p).is_err() { + break; } } }) diff --git a/core/src/tvu.rs b/core/src/tvu.rs index dbe3e530d0..bc9c3e4c1f 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -99,17 +99,13 @@ impl Tvu { let (fetch_sender, fetch_receiver) = channel(); let repair_socket = Arc::new(repair_socket); - let mut blob_sockets: Vec> = + let mut fetch_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); - blob_sockets.push(repair_socket.clone()); - let blob_forward_sockets: Vec> = + fetch_sockets.push(repair_socket.clone()); + let forward_sockets: Vec> = tvu_forward_sockets.into_iter().map(Arc::new).collect(); - let fetch_stage = ShredFetchStage::new_multi_socket( - blob_sockets, - blob_forward_sockets, - &fetch_sender, - &exit, - ); + let fetch_stage = + ShredFetchStage::new(fetch_sockets, forward_sockets, &fetch_sender, &exit); //TODO //the packets coming out of blob_receiver need to be sent to the GPU and verified