diff --git a/src/lib.rs b/src/lib.rs index 7439d16a81..4711fed1c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,8 @@ pub mod mint; pub mod packet; pub mod plan; pub mod record_stage; +pub mod request_replicator; +pub mod replicate_stage; pub mod recorder; pub mod request; pub mod request_processor; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 6951f57e03..0bd259f904 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,22 +1,11 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. -use bank::Bank; -use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; -use ledger; use packet; -use record_stage::RecordStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; -use std::net::UdpSocket; +use request_replicator::RequestReplicator; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{spawn, JoinHandle}; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct ReplicateStage { pub thread_hdl: JoinHandle<()>, @@ -24,13 +13,13 @@ pub struct ReplicateStage { impl ReplicateStage { - pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler) -> Self { + pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler) -> Self { let thread_hdl = spawn(move || loop { let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); - if e.is_err() && s_exit.load(Ordering::Relaxed) { + if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - ReplicateStage{thread_hdl}; + ReplicateStage{thread_hdl} } } diff --git a/src/request_replicator.rs b/src/request_replicator.rs index bfb5ef6099..3408af8a30 100644 --- a/src/request_replicator.rs +++ b/src/request_replicator.rs @@ -2,32 +2,22 @@ //! by the leader. use bank::Bank; -use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use ledger; use packet; -use record_stage::RecordStage; use result::Result; -use sig_verify_stage::SigVerifyStage; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; -use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; -use write_stage::WriteStage; +use std::sync::Arc; pub struct RequestReplicator { bank: Arc, } -impl Tvu { +impl RequestReplicator { /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Bank) -> Self { + pub fn new(bank: Arc) -> Self { RequestReplicator { - bank: Arc::new(bank), + bank: bank, } } diff --git a/src/tvu.rs b/src/tvu.rs index f0ec2ccac6..31e291cb7b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -5,16 +5,17 @@ use bank::Bank; use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; use hash::Hash; -use ledger; use packet; use record_stage::RecordStage; +use request_replicator::RequestReplicator; +use replicate_stage::ReplicateStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::{spawn, JoinHandle}; +use std::thread::JoinHandle; use std::time::Duration; use streamer; use write_stage::WriteStage; @@ -35,27 +36,6 @@ impl Tvu { } } - /// Process verified blobs, already in order - /// Respond with a signed hash of the state - fn replicate_state( - obj: &Tvu, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - let res = obj.bank.process_verified_entries(entries); - if res.is_err() { - error!("process_verified_entries {} {:?}", blobs.len(), res); - } - res?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) - } - /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments @@ -132,14 +112,13 @@ impl Tvu { retransmit_sender, ); - let tvu = obj.clone(); - let s_exit = exit.clone(); - let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler); - if e.is_err() && s_exit.load(Ordering::Relaxed) { - break; - } - }); + let request_replicator = RequestReplicator::new(obj.bank.clone()); + let replicate_stage = ReplicateStage::new( + request_replicator, + exit.clone(), + window_receiver, + blob_recycler.clone(), + ); //serve pipeline // make sure we are on the same interface @@ -178,7 +157,7 @@ impl Tvu { t_blob_receiver, t_retransmit, t_window, - t_replicator, + replicate_stage.thread_hdl, t_gossip, t_listen, //serve threads @@ -375,5 +354,4 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } - }