Finally made fetch happen

This commit is contained in:
Greg Fitzgerald
2018-05-29 11:18:12 -06:00
parent 5e824b39dd
commit f2ccc133a2
3 changed files with 44 additions and 17 deletions

31
src/fetch_stage.rs Normal file
View File

@ -0,0 +1,31 @@
//! The `fetch_stage` batches input from a UDP socket and sends it to a channel.
use packet;
use std::net::UdpSocket;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::thread::JoinHandle;
use streamer;
pub struct FetchStage {
pub packet_receiver: streamer::PacketReceiver,
pub thread_hdl: JoinHandle<()>,
}
impl FetchStage {
pub fn new(
socket: UdpSocket,
exit: Arc<AtomicBool>,
packet_recycler: packet::PacketRecycler,
) -> Self {
let (packet_sender, packet_receiver) = channel();
let thread_hdl =
streamer::receiver(socket, exit.clone(), packet_recycler.clone(), packet_sender);
FetchStage {
packet_receiver,
thread_hdl,
}
}
}

View File

@ -6,6 +6,7 @@ pub mod entry;
pub mod entry_writer; pub mod entry_writer;
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
pub mod erasure; pub mod erasure;
pub mod fetch_stage;
pub mod hash; pub mod hash;
pub mod ledger; pub mod ledger;
pub mod logger; pub mod logger;

View File

@ -3,22 +3,22 @@
use bank::Bank; use bank::Bank;
use banking_stage::BankingStage; use banking_stage::BankingStage;
use fetch_stage::FetchStage;
use hash::Hash; use hash::Hash;
use packet; use packet::{BlobRecycler, PacketRecycler};
use record_stage::RecordStage; use record_stage::RecordStage;
use sigverify_stage::SigVerifyStage; use sigverify_stage::SigVerifyStage;
use std::io::Write; use std::io::Write;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use std::time::Duration; use std::time::Duration;
use streamer; use streamer::BlobReceiver;
use write_stage::WriteStage; use write_stage::WriteStage;
pub struct Tpu { pub struct Tpu {
pub blob_receiver: streamer::BlobReceiver, pub blob_receiver: BlobReceiver,
pub thread_hdls: Vec<JoinHandle<()>>, pub thread_hdls: Vec<JoinHandle<()>>,
} }
@ -28,20 +28,16 @@ impl Tpu {
start_hash: Hash, start_hash: Hash,
tick_duration: Option<Duration>, tick_duration: Option<Duration>,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
blob_recycler: packet::BlobRecycler, blob_recycler: BlobRecycler,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
writer: W, writer: W,
) -> Self { ) -> Self {
let packet_recycler = packet::PacketRecycler::default(); let packet_recycler = PacketRecycler::default();
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(
transactions_socket,
exit.clone(),
packet_recycler.clone(),
packet_sender,
);
let sigverify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); let fetch_stage =
FetchStage::new(transactions_socket, exit.clone(), packet_recycler.clone());
let sigverify_stage = SigVerifyStage::new(exit.clone(), fetch_stage.packet_receiver);
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
bank.clone(), bank.clone(),
@ -61,16 +57,15 @@ impl Tpu {
record_stage.entry_receiver, record_stage.entry_receiver,
); );
let blob_receiver = write_stage.blob_receiver;
let mut thread_hdls = vec![ let mut thread_hdls = vec![
t_receiver, fetch_stage.thread_hdl,
banking_stage.thread_hdl, banking_stage.thread_hdl,
record_stage.thread_hdl, record_stage.thread_hdl,
write_stage.thread_hdl, write_stage.thread_hdl,
]; ];
thread_hdls.extend(sigverify_stage.thread_hdls.into_iter()); thread_hdls.extend(sigverify_stage.thread_hdls.into_iter());
Tpu { Tpu {
blob_receiver, blob_receiver: write_stage.blob_receiver,
thread_hdls, thread_hdls,
} }
} }