From 26272a3600c59aca045ccc575d3114cb31dc7f05 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 14:26:28 -0700 Subject: [PATCH 1/4] split out stages --- src/replicate_stage.rs | 36 ++++++++++++++++++++++++++ src/request_replicator.rs | 53 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/replicate_stage.rs create mode 100644 src/request_replicator.rs diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs new file mode 100644 index 0000000000..6951f57e03 --- /dev/null +++ b/src/replicate_stage.rs @@ -0,0 +1,36 @@ +//! The `replicate_stage` replicates transactions broadcast by the leader. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use ledger; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct ReplicateStage { + pub thread_hdl: JoinHandle<()>, +} + +impl ReplicateStage { + + pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler) -> Self { + let thread_hdl = spawn(move || loop { + let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); + if e.is_err() && s_exit.load(Ordering::Relaxed) { + break; + } + }); + ReplicateStage{thread_hdl}; + } +} diff --git a/src/request_replicator.rs b/src/request_replicator.rs new file mode 100644 index 0000000000..bfb5ef6099 --- /dev/null +++ b/src/request_replicator.rs @@ -0,0 +1,53 @@ +//! The `request_replicator` is part of `replicator_stage` which replicates transactions broadcast +//! by the leader. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use ledger; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct RequestReplicator { + bank: Arc, +} + +impl Tvu { + /// Create a new Tvu that wraps the given Bank. + pub fn new(bank: Bank) -> Self { + RequestReplicator { + bank: Arc::new(bank), + } + } + + /// Process verified blobs, already in order + pub fn replicate_requests( + &self, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + let res = self.bank.process_verified_entries(entries); + if res.is_err() { + error!("process_verified_entries {} {:?}", blobs.len(), res); + } + res?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } +} From a638ec5911a1a5f78c34ffef4ffeedc425870d6e Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 15:17:59 -0700 Subject: [PATCH 2/4] builds --- src/lib.rs | 2 ++ src/replicate_stage.rs | 21 +++++------------- src/request_replicator.rs | 18 ++++----------- src/tvu.rs | 46 ++++++++++----------------------------- 4 files changed, 23 insertions(+), 64 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7439d16a81..4711fed1c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,8 @@ pub mod mint; pub mod packet; pub mod plan; pub mod record_stage; +pub mod request_replicator; +pub mod replicate_stage; pub mod recorder; pub mod request; pub mod request_processor; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 6951f57e03..0bd259f904 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,22 +1,11 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. -use bank::Bank; -use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; -use ledger; use packet; -use record_stage::RecordStage; -use result::Result; -use sig_verify_stage::SigVerifyStage; -use std::net::UdpSocket; +use request_replicator::RequestReplicator; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::{spawn, JoinHandle}; -use std::time::Duration; use streamer; -use write_stage::WriteStage; pub struct ReplicateStage { pub thread_hdl: JoinHandle<()>, @@ -24,13 +13,13 @@ pub struct ReplicateStage { impl ReplicateStage { - pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler) -> Self { + pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler) -> Self { let thread_hdl = spawn(move || loop { let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); - if e.is_err() && s_exit.load(Ordering::Relaxed) { + if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - ReplicateStage{thread_hdl}; + ReplicateStage{thread_hdl} } } diff --git a/src/request_replicator.rs b/src/request_replicator.rs index bfb5ef6099..3408af8a30 100644 --- a/src/request_replicator.rs +++ b/src/request_replicator.rs @@ -2,32 +2,22 @@ //! by the leader. use bank::Bank; -use banking_stage::BankingStage; -use crdt::{Crdt, ReplicatedData}; -use hash::Hash; use ledger; use packet; -use record_stage::RecordStage; use result::Result; -use sig_verify_stage::SigVerifyStage; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; -use std::sync::{Arc, RwLock}; -use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; -use write_stage::WriteStage; +use std::sync::Arc; pub struct RequestReplicator { bank: Arc, } -impl Tvu { +impl RequestReplicator { /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Bank) -> Self { + pub fn new(bank: Arc) -> Self { RequestReplicator { - bank: Arc::new(bank), + bank: bank, } } diff --git a/src/tvu.rs b/src/tvu.rs index f0ec2ccac6..31e291cb7b 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -5,16 +5,17 @@ use bank::Bank; use banking_stage::BankingStage; use crdt::{Crdt, ReplicatedData}; use hash::Hash; -use ledger; use packet; use record_stage::RecordStage; +use request_replicator::RequestReplicator; +use replicate_stage::ReplicateStage; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::{spawn, JoinHandle}; +use std::thread::JoinHandle; use std::time::Duration; use streamer; use write_stage::WriteStage; @@ -35,27 +36,6 @@ impl Tvu { } } - /// Process verified blobs, already in order - /// Respond with a signed hash of the state - fn replicate_state( - obj: &Tvu, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - let res = obj.bank.process_verified_entries(entries); - if res.is_err() { - error!("process_verified_entries {} {:?}", blobs.len(), res); - } - res?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) - } - /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. /// # Arguments @@ -132,14 +112,13 @@ impl Tvu { retransmit_sender, ); - let tvu = obj.clone(); - let s_exit = exit.clone(); - let t_replicator = spawn(move || loop { - let e = Self::replicate_state(&tvu, &window_receiver, &blob_recycler); - if e.is_err() && s_exit.load(Ordering::Relaxed) { - break; - } - }); + let request_replicator = RequestReplicator::new(obj.bank.clone()); + let replicate_stage = ReplicateStage::new( + request_replicator, + exit.clone(), + window_receiver, + blob_recycler.clone(), + ); //serve pipeline // make sure we are on the same interface @@ -178,7 +157,7 @@ impl Tvu { t_blob_receiver, t_retransmit, t_window, - t_replicator, + replicate_stage.thread_hdl, t_gossip, t_listen, //serve threads @@ -375,5 +354,4 @@ mod tests { t_l_gossip.join().expect("join"); t_l_listen.join().expect("join"); } - } From bbe89df2ff7ca7ec855dcf08a361544b39a45107 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 15:18:07 -0700 Subject: [PATCH 3/4] fmt --- src/lib.rs | 4 ++-- src/replicate_stage.rs | 12 ++++++++---- src/request_replicator.rs | 6 ++---- src/tvu.rs | 2 +- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4711fed1c8..c4ab14be36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,11 +15,11 @@ pub mod mint; pub mod packet; pub mod plan; pub mod record_stage; -pub mod request_replicator; -pub mod replicate_stage; pub mod recorder; +pub mod replicate_stage; pub mod request; pub mod request_processor; +pub mod request_replicator; pub mod request_stage; pub mod result; pub mod rpu; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 0bd259f904..449114a33c 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,8 +2,8 @@ use packet; use request_replicator::RequestReplicator; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{spawn, JoinHandle}; use streamer; @@ -12,14 +12,18 @@ pub struct ReplicateStage { } impl ReplicateStage { - - pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler) -> Self { + pub fn new( + request_replicator: RequestReplicator, + exit: Arc, + window_receiver: streamer::BlobReceiver, + blob_recycler: packet::BlobRecycler, + ) -> Self { let thread_hdl = spawn(move || loop { let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - ReplicateStage{thread_hdl} + ReplicateStage { thread_hdl } } } diff --git a/src/request_replicator.rs b/src/request_replicator.rs index 3408af8a30..6af56dfe05 100644 --- a/src/request_replicator.rs +++ b/src/request_replicator.rs @@ -5,9 +5,9 @@ use bank::Bank; use ledger; use packet; use result::Result; +use std::sync::Arc; use std::time::Duration; use streamer; -use std::sync::Arc; pub struct RequestReplicator { bank: Arc, @@ -16,9 +16,7 @@ pub struct RequestReplicator { impl RequestReplicator { /// Create a new Tvu that wraps the given Bank. pub fn new(bank: Arc) -> Self { - RequestReplicator { - bank: bank, - } + RequestReplicator { bank: bank } } /// Process verified blobs, already in order diff --git a/src/tvu.rs b/src/tvu.rs index 31e291cb7b..831b8e0a92 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -7,8 +7,8 @@ use crdt::{Crdt, ReplicatedData}; use hash::Hash; use packet; use record_stage::RecordStage; -use request_replicator::RequestReplicator; use replicate_stage::ReplicateStage; +use request_replicator::RequestReplicator; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; From 021953d59ae73fcabfd18c8a3729df2381297e42 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 15:30:46 -0700 Subject: [PATCH 4/4] cleanup --- src/lib.rs | 1 - src/replicate_stage.rs | 29 ++++++++++++++++++++++++--- src/request_replicator.rs | 41 --------------------------------------- src/tvu.rs | 4 +--- 4 files changed, 27 insertions(+), 48 deletions(-) delete mode 100644 src/request_replicator.rs diff --git a/src/lib.rs b/src/lib.rs index c4ab14be36..930c96d3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,6 @@ pub mod recorder; pub mod replicate_stage; pub mod request; pub mod request_processor; -pub mod request_replicator; pub mod request_stage; pub mod result; pub mod rpu; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 449114a33c..54a3987f31 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,10 +1,13 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. +use bank::Bank; +use ledger; use packet; -use request_replicator::RequestReplicator; +use result::Result; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{spawn, JoinHandle}; +use std::time::Duration; use streamer; pub struct ReplicateStage { @@ -12,14 +15,34 @@ pub struct ReplicateStage { } impl ReplicateStage { + /// Process verified blobs, already in order + fn replicate_requests( + bank: &Arc, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + let res = bank.process_verified_entries(entries); + if res.is_err() { + error!("process_verified_entries {} {:?}", blobs.len(), res); + } + res?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } + pub fn new( - request_replicator: RequestReplicator, + bank: Arc, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler, ) -> Self { let thread_hdl = spawn(move || loop { - let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); + let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } diff --git a/src/request_replicator.rs b/src/request_replicator.rs deleted file mode 100644 index 6af56dfe05..0000000000 --- a/src/request_replicator.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! The `request_replicator` is part of `replicator_stage` which replicates transactions broadcast -//! by the leader. - -use bank::Bank; -use ledger; -use packet; -use result::Result; -use std::sync::Arc; -use std::time::Duration; -use streamer; - -pub struct RequestReplicator { - bank: Arc, -} - -impl RequestReplicator { - /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Arc) -> Self { - RequestReplicator { bank: bank } - } - - /// Process verified blobs, already in order - pub fn replicate_requests( - &self, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - let res = self.bank.process_verified_entries(entries); - if res.is_err() { - error!("process_verified_entries {} {:?}", blobs.len(), res); - } - res?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) - } -} diff --git a/src/tvu.rs b/src/tvu.rs index 831b8e0a92..4e014fddc8 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -8,7 +8,6 @@ use hash::Hash; use packet; use record_stage::RecordStage; use replicate_stage::ReplicateStage; -use request_replicator::RequestReplicator; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; @@ -112,9 +111,8 @@ impl Tvu { retransmit_sender, ); - let request_replicator = RequestReplicator::new(obj.bank.clone()); let replicate_stage = ReplicateStage::new( - request_replicator, + obj.bank.clone(), exit.clone(), window_receiver, blob_recycler.clone(),