diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 217ac25c49..e40588376b 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -32,18 +32,25 @@ impl FetchStage { sender: &PacketSender, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); - Self::new_multi_socket(tx_sockets, exit, &sender) + let forwarder_sockets = forwarder_sockets.into_iter().map(Arc::new).collect(); + Self::new_multi_socket(tx_sockets, forwarder_sockets, exit, &sender) } + fn new_multi_socket( sockets: Vec>, + forwarder_sockets: Vec>, exit: &Arc, sender: &PacketSender, ) -> Self { - let thread_hdls: Vec<_> = sockets + let tpu_threads = sockets .into_iter() - .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")) - .collect(); + .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")); + let forwarder_threads = forwarder_sockets + .into_iter() + .map(|socket| streamer::blob_packet_receiver(socket, &exit, sender.clone())); + + let thread_hdls: Vec<_> = tpu_threads.chain(forwarder_threads).collect(); Self { thread_hdls } } } diff --git a/core/src/packet.rs b/core/src/packet.rs index eb88f756c9..2824195878 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -115,6 +115,10 @@ impl Default for Packets { } impl Packets { + pub fn new(packets: Vec) -> Self { + Self { packets } + } + pub fn set_addr(&mut self, addr: &SocketAddr) { for m in self.packets.iter_mut() { m.meta.set_addr(&addr); diff --git a/core/src/streamer.rs b/core/src/streamer.rs index 95de7a8805..fed499f7fa 100644 --- a/core/src/streamer.rs +++ b/core/src/streamer.rs @@ -1,14 +1,15 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! -use crate::packet::{Blob, SharedBlobs, SharedPackets}; +use crate::packet::{Blob, Packet, Packets, SharedBlobs, SharedPackets}; use crate::result::{Error, Result}; +use bincode::deserialize; use solana_metrics::{influxdb, submit}; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; @@ -139,6 +140,46 @@ pub fn blob_receiver( .unwrap() } +fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> { + trace!( + "recv_blob_packets: receiving on {}", + sock.local_addr().unwrap() + ); + let blobs = Blob::recv_from(sock)?; + for blob in blobs { + let msgs: Vec = { + let r_blob = blob.read().unwrap(); + let msg_size = r_blob.size(); + deserialize(&r_blob.data()[..msg_size])? + }; + s.send(Arc::new(RwLock::new(Packets::new(msgs))))?; + } + + Ok(()) +} + +pub fn blob_packet_receiver( + sock: Arc, + exit: &Arc, + s: PacketSender, +) -> JoinHandle<()> { + //DOCUMENTED SIDE-EFFECT + //1 second timeout on socket read + let timer = Duration::new(1, 0); + sock.set_read_timeout(Some(timer)) + .expect("set socket timeout"); + let exit = exit.clone(); + Builder::new() + .name("solana-blob_packet_receiver".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = recv_blob_packets(&sock, &s); + }) + .unwrap() +} + #[cfg(test)] mod test { use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};