From 021953d59ae73fcabfd18c8a3729df2381297e42 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 15:30:46 -0700 Subject: [PATCH] cleanup --- src/lib.rs | 1 - src/replicate_stage.rs | 29 ++++++++++++++++++++++++--- src/request_replicator.rs | 41 --------------------------------------- src/tvu.rs | 4 +--- 4 files changed, 27 insertions(+), 48 deletions(-) delete mode 100644 src/request_replicator.rs diff --git a/src/lib.rs b/src/lib.rs index c4ab14be36..930c96d3fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,6 @@ pub mod recorder; pub mod replicate_stage; pub mod request; pub mod request_processor; -pub mod request_replicator; pub mod request_stage; pub mod result; pub mod rpu; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 449114a33c..54a3987f31 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -1,10 +1,13 @@ //! The `replicate_stage` replicates transactions broadcast by the leader. +use bank::Bank; +use ledger; use packet; -use request_replicator::RequestReplicator; +use result::Result; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{spawn, JoinHandle}; +use std::time::Duration; use streamer; pub struct ReplicateStage { @@ -12,14 +15,34 @@ pub struct ReplicateStage { } impl ReplicateStage { + /// Process verified blobs, already in order + fn replicate_requests( + bank: &Arc, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + let res = bank.process_verified_entries(entries); + if res.is_err() { + error!("process_verified_entries {} {:?}", blobs.len(), res); + } + res?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } + pub fn new( - request_replicator: RequestReplicator, + bank: Arc, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: packet::BlobRecycler, ) -> Self { let thread_hdl = spawn(move || loop { - let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); + let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); if e.is_err() && exit.load(Ordering::Relaxed) { break; } diff --git a/src/request_replicator.rs b/src/request_replicator.rs deleted file mode 100644 index 6af56dfe05..0000000000 --- a/src/request_replicator.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! The `request_replicator` is part of `replicator_stage` which replicates transactions broadcast -//! by the leader. - -use bank::Bank; -use ledger; -use packet; -use result::Result; -use std::sync::Arc; -use std::time::Duration; -use streamer; - -pub struct RequestReplicator { - bank: Arc, -} - -impl RequestReplicator { - /// Create a new Tvu that wraps the given Bank. - pub fn new(bank: Arc) -> Self { - RequestReplicator { bank: bank } - } - - /// Process verified blobs, already in order - pub fn replicate_requests( - &self, - verified_receiver: &streamer::BlobReceiver, - blob_recycler: &packet::BlobRecycler, - ) -> Result<()> { - let timer = Duration::new(1, 0); - let blobs = verified_receiver.recv_timeout(timer)?; - let entries = ledger::reconstruct_entries_from_blobs(&blobs); - let res = self.bank.process_verified_entries(entries); - if res.is_err() { - error!("process_verified_entries {} {:?}", blobs.len(), res); - } - res?; - for blob in blobs { - blob_recycler.recycle(blob); - } - Ok(()) - } -} diff --git a/src/tvu.rs b/src/tvu.rs index 831b8e0a92..4e014fddc8 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -8,7 +8,6 @@ use hash::Hash; use packet; use record_stage::RecordStage; use replicate_stage::ReplicateStage; -use request_replicator::RequestReplicator; use result::Result; use sig_verify_stage::SigVerifyStage; use std::net::UdpSocket; @@ -112,9 +111,8 @@ impl Tvu { retransmit_sender, ); - let request_replicator = RequestReplicator::new(obj.bank.clone()); let replicate_stage = ReplicateStage::new( - request_replicator, + obj.bank.clone(), exit.clone(), window_receiver, blob_recycler.clone(),