Add separate sockets for tpu forwarder and run different protocol for those sockets
This commit is contained in:
		@@ -32,18 +32,25 @@ impl FetchStage {
 | 
				
			|||||||
        sender: &PacketSender,
 | 
					        sender: &PacketSender,
 | 
				
			||||||
    ) -> Self {
 | 
					    ) -> Self {
 | 
				
			||||||
        let tx_sockets = sockets.into_iter().map(Arc::new).collect();
 | 
					        let tx_sockets = sockets.into_iter().map(Arc::new).collect();
 | 
				
			||||||
        Self::new_multi_socket(tx_sockets, exit, &sender)
 | 
					        let forwarder_sockets = forwarder_sockets.into_iter().map(Arc::new).collect();
 | 
				
			||||||
 | 
					        Self::new_multi_socket(tx_sockets, forwarder_sockets, exit, &sender)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn new_multi_socket(
 | 
					    fn new_multi_socket(
 | 
				
			||||||
        sockets: Vec<Arc<UdpSocket>>,
 | 
					        sockets: Vec<Arc<UdpSocket>>,
 | 
				
			||||||
 | 
					        forwarder_sockets: Vec<Arc<UdpSocket>>,
 | 
				
			||||||
        exit: &Arc<AtomicBool>,
 | 
					        exit: &Arc<AtomicBool>,
 | 
				
			||||||
        sender: &PacketSender,
 | 
					        sender: &PacketSender,
 | 
				
			||||||
    ) -> Self {
 | 
					    ) -> Self {
 | 
				
			||||||
        let thread_hdls: Vec<_> = sockets
 | 
					        let tpu_threads = sockets
 | 
				
			||||||
            .into_iter()
 | 
					            .into_iter()
 | 
				
			||||||
            .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage"))
 | 
					            .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage"));
 | 
				
			||||||
            .collect();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let forwarder_threads = forwarder_sockets
 | 
				
			||||||
 | 
					            .into_iter()
 | 
				
			||||||
 | 
					            .map(|socket| streamer::blob_packet_receiver(socket, &exit, sender.clone()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let thread_hdls: Vec<_> = tpu_threads.chain(forwarder_threads).collect();
 | 
				
			||||||
        Self { thread_hdls }
 | 
					        Self { thread_hdls }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -115,6 +115,10 @@ impl Default for Packets {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Packets {
 | 
					impl Packets {
 | 
				
			||||||
 | 
					    pub fn new(packets: Vec<Packet>) -> Self {
 | 
				
			||||||
 | 
					        Self { packets }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn set_addr(&mut self, addr: &SocketAddr) {
 | 
					    pub fn set_addr(&mut self, addr: &SocketAddr) {
 | 
				
			||||||
        for m in self.packets.iter_mut() {
 | 
					        for m in self.packets.iter_mut() {
 | 
				
			||||||
            m.meta.set_addr(&addr);
 | 
					            m.meta.set_addr(&addr);
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,14 +1,15 @@
 | 
				
			|||||||
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
 | 
					//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
 | 
				
			||||||
//!
 | 
					//!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::packet::{Blob, SharedBlobs, SharedPackets};
 | 
					use crate::packet::{Blob, Packet, Packets, SharedBlobs, SharedPackets};
 | 
				
			||||||
use crate::result::{Error, Result};
 | 
					use crate::result::{Error, Result};
 | 
				
			||||||
 | 
					use bincode::deserialize;
 | 
				
			||||||
use solana_metrics::{influxdb, submit};
 | 
					use solana_metrics::{influxdb, submit};
 | 
				
			||||||
use solana_sdk::timing::duration_as_ms;
 | 
					use solana_sdk::timing::duration_as_ms;
 | 
				
			||||||
use std::net::UdpSocket;
 | 
					use std::net::UdpSocket;
 | 
				
			||||||
use std::sync::atomic::{AtomicBool, Ordering};
 | 
					use std::sync::atomic::{AtomicBool, Ordering};
 | 
				
			||||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
 | 
					use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
 | 
				
			||||||
use std::sync::Arc;
 | 
					use std::sync::{Arc, RwLock};
 | 
				
			||||||
use std::thread::{Builder, JoinHandle};
 | 
					use std::thread::{Builder, JoinHandle};
 | 
				
			||||||
use std::time::{Duration, Instant};
 | 
					use std::time::{Duration, Instant};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -139,6 +140,46 @@ pub fn blob_receiver(
 | 
				
			|||||||
        .unwrap()
 | 
					        .unwrap()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn recv_blob_packets(sock: &UdpSocket, s: &PacketSender) -> Result<()> {
 | 
				
			||||||
 | 
					    trace!(
 | 
				
			||||||
 | 
					        "recv_blob_packets: receiving on {}",
 | 
				
			||||||
 | 
					        sock.local_addr().unwrap()
 | 
				
			||||||
 | 
					    );
 | 
				
			||||||
 | 
					    let blobs = Blob::recv_from(sock)?;
 | 
				
			||||||
 | 
					    for blob in blobs {
 | 
				
			||||||
 | 
					        let msgs: Vec<Packet> = {
 | 
				
			||||||
 | 
					            let r_blob = blob.read().unwrap();
 | 
				
			||||||
 | 
					            let msg_size = r_blob.size();
 | 
				
			||||||
 | 
					            deserialize(&r_blob.data()[..msg_size])?
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        s.send(Arc::new(RwLock::new(Packets::new(msgs))))?;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Ok(())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub fn blob_packet_receiver(
 | 
				
			||||||
 | 
					    sock: Arc<UdpSocket>,
 | 
				
			||||||
 | 
					    exit: &Arc<AtomicBool>,
 | 
				
			||||||
 | 
					    s: PacketSender,
 | 
				
			||||||
 | 
					) -> JoinHandle<()> {
 | 
				
			||||||
 | 
					    //DOCUMENTED SIDE-EFFECT
 | 
				
			||||||
 | 
					    //1 second timeout on socket read
 | 
				
			||||||
 | 
					    let timer = Duration::new(1, 0);
 | 
				
			||||||
 | 
					    sock.set_read_timeout(Some(timer))
 | 
				
			||||||
 | 
					        .expect("set socket timeout");
 | 
				
			||||||
 | 
					    let exit = exit.clone();
 | 
				
			||||||
 | 
					    Builder::new()
 | 
				
			||||||
 | 
					        .name("solana-blob_packet_receiver".to_string())
 | 
				
			||||||
 | 
					        .spawn(move || loop {
 | 
				
			||||||
 | 
					            if exit.load(Ordering::Relaxed) {
 | 
				
			||||||
 | 
					                break;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            let _ = recv_blob_packets(&sock, &s);
 | 
				
			||||||
 | 
					        })
 | 
				
			||||||
 | 
					        .unwrap()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(test)]
 | 
					#[cfg(test)]
 | 
				
			||||||
mod test {
 | 
					mod test {
 | 
				
			||||||
    use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
 | 
					    use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user