diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs new file mode 100644 index 0000000000..71e633d67b --- /dev/null +++ b/src/fetch_stage.rs @@ -0,0 +1,31 @@ +//! The `fetch_stage` batches input from a UDP socket and sends it to a channel. + +use packet; +use std::net::UdpSocket; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::thread::JoinHandle; +use streamer; + +pub struct FetchStage { + pub packet_receiver: streamer::PacketReceiver, + pub thread_hdl: JoinHandle<()>, +} + +impl FetchStage { + pub fn new( + socket: UdpSocket, + exit: Arc, + packet_recycler: packet::PacketRecycler, + ) -> Self { + let (packet_sender, packet_receiver) = channel(); + let thread_hdl = + streamer::receiver(socket, exit.clone(), packet_recycler.clone(), packet_sender); + + FetchStage { + packet_receiver, + thread_hdl, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 184da6fdee..7ce12c2444 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod entry; pub mod entry_writer; #[cfg(feature = "erasure")] pub mod erasure; +pub mod fetch_stage; pub mod hash; pub mod ledger; pub mod logger; diff --git a/src/tpu.rs b/src/tpu.rs index b7758b4460..9737f27fdf 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -3,22 +3,22 @@ use bank::Bank; use banking_stage::BankingStage; +use fetch_stage::FetchStage; use hash::Hash; -use packet; +use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use std::thread::JoinHandle; use std::time::Duration; -use streamer; +use streamer::BlobReceiver; use write_stage::WriteStage; pub struct Tpu { - pub blob_receiver: streamer::BlobReceiver, + pub blob_receiver: BlobReceiver, pub thread_hdls: Vec>, } @@ -28,20 +28,16 @@ impl Tpu { start_hash: Hash, tick_duration: Option, transactions_socket: UdpSocket, - blob_recycler: packet::BlobRecycler, + blob_recycler: BlobRecycler, exit: Arc, writer: W, ) -> Self { - let packet_recycler = packet::PacketRecycler::default(); - let (packet_sender, packet_receiver) = channel(); - let t_receiver = streamer::receiver( - transactions_socket, - exit.clone(), - packet_recycler.clone(), - packet_sender, - ); + let packet_recycler = PacketRecycler::default(); - let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + let fetch_stage = + FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone()); + + let sigverify_stage = SigVerifyStage::new(exit.clone(), fetch_stage.packet_receiver); let banking_stage = BankingStage::new( bank.clone(), @@ -61,16 +57,15 @@ impl Tpu { record_stage.entry_receiver, ); - let blob_receiver = write_stage.blob_receiver; let mut thread_hdls = vec![ - t_receiver, + fetch_stage.thread_hdl, banking_stage.thread_hdl, record_stage.thread_hdl, write_stage.thread_hdl, ]; thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); Tpu { - blob_receiver, + blob_receiver: write_stage.blob_receiver, thread_hdls, } }