From ef6bd7e3b8c6c6c1a8577c961e5677f13bc5236c Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Mon, 14 May 2018 17:36:19 -0600 Subject: [PATCH] Add TPU --- src/lib.rs | 1 + src/tpu.rs | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 src/tpu.rs diff --git a/src/lib.rs b/src/lib.rs index dfe8697fdd..993b04cb8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub mod signature; pub mod streamer; pub mod thin_client; pub mod timing; +pub mod tpu; pub mod transaction; pub mod tvu; pub mod write_stage; diff --git a/src/tpu.rs b/src/tpu.rs new file mode 100644 index 0000000000..accda0d52d --- /dev/null +++ b/src/tpu.rs @@ -0,0 +1,112 @@ +//! The `tpu` module implements the Transaction Processing Unit, a +//! 5-stage transaction processing pipeline in software. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::io::Write; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex, RwLock}; +use std::thread::JoinHandle; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct Tpu { + bank: Arc, + start_hash: Hash, + tick_duration: Option, +} + +impl Tpu { + /// Create a new Tpu that wraps the given Bank. + pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option) -> Self { + Tpu { + bank: Arc::new(bank), + start_hash, + tick_duration, + } + } + + /// Create a UDP microservice that forwards messages the given Tpu. + /// This service is the network leader + /// Set `exit` to shutdown its threads. + pub fn serve( + &self, + me: ReplicatedData, + requests_socket: UdpSocket, + gossip: UdpSocket, + exit: Arc, + writer: W, + ) -> Result>> { + let crdt = Arc::new(RwLock::new(Crdt::new(me))); + let t_gossip = Crdt::gossip(crdt.clone(), exit.clone()); + let window = streamer::default_window(); + let t_listen = Crdt::listen(crdt.clone(), window.clone(), gossip, exit.clone()); + + // make sure we are on the same interface + let mut local = requests_socket.local_addr()?; + local.set_port(0); + + let packet_recycler = packet::PacketRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_receiver = streamer::receiver( + requests_socket, + exit.clone(), + packet_recycler.clone(), + packet_sender, + )?; + + let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver); + + let blob_recycler = packet::BlobRecycler::default(); + let banking_stage = BankingStage::new( + self.bank.clone(), + exit.clone(), + sig_verify_stage.verified_receiver, + packet_recycler.clone(), + ); + + let record_stage = RecordStage::new( + banking_stage.signal_receiver, + &self.start_hash, + self.tick_duration, + ); + + let write_stage = WriteStage::new( + self.bank.clone(), + exit.clone(), + blob_recycler.clone(), + Mutex::new(writer), + record_stage.entry_receiver, + ); + + let broadcast_socket = UdpSocket::bind(local)?; + let t_broadcast = streamer::broadcaster( + broadcast_socket, + exit.clone(), + crdt.clone(), + window, + blob_recycler.clone(), + write_stage.blob_receiver, + ); + + let mut threads = vec![ + t_receiver, + banking_stage.thread_hdl, + write_stage.thread_hdl, + t_gossip, + t_listen, + t_broadcast, + ]; + threads.extend(sig_verify_stage.thread_hdls.into_iter()); + Ok(threads) + } +}