//! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use packet::PacketRecycler; use service::Service; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::Arc; use std::thread::{self, JoinHandle}; use streamer::{self, PacketReceiver}; pub struct FetchStage { exit: Arc, thread_hdls: Vec>, } impl FetchStage { pub fn new( socket: UdpSocket, exit: Arc, packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { Self::new_multi_socket(vec![socket], exit, packet_recycler) } pub fn new_multi_socket( sockets: Vec, exit: Arc, packet_recycler: &PacketRecycler, ) -> (Self, PacketReceiver) { let (packet_sender, packet_receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() .map(|socket| { streamer::receiver( socket, exit.clone(), packet_recycler.clone(), packet_sender.clone(), ) }) .collect(); (FetchStage { exit, thread_hdls }, packet_receiver) } pub fn close(&self) { self.exit.store(true, Ordering::Relaxed); } } impl Service for FetchStage { fn thread_hdls(self) -> Vec> { self.thread_hdls } fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls() { thread_hdl.join()?; } Ok(()) } }