From 26272a3600c59aca045ccc575d3114cb31dc7f05 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Tue, 22 May 2018 14:26:28 -0700 Subject: [PATCH] split out stages --- src/replicate_stage.rs | 36 ++++++++++++++++++++++++++ src/request_replicator.rs | 53 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 src/replicate_stage.rs create mode 100644 src/request_replicator.rs diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs new file mode 100644 index 0000000000..6951f57e03 --- /dev/null +++ b/src/replicate_stage.rs @@ -0,0 +1,36 @@ +//! The `replicate_stage` replicates transactions broadcast by the leader. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use ledger; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct ReplicateStage { + pub thread_hdl: JoinHandle<()>, +} + +impl ReplicateStage { + + pub fn new(request_replicator: RequestReplicator, exit: Arc, window_receiver: streamer::BlobReceiver, blob_recycler: &packet::BlobRecycler) -> Self { + let thread_hdl = spawn(move || loop { + let e = request_replicator.replicate_requests(&window_receiver, &blob_recycler); + if e.is_err() && s_exit.load(Ordering::Relaxed) { + break; + } + }); + ReplicateStage{thread_hdl}; + } +} diff --git a/src/request_replicator.rs b/src/request_replicator.rs new file mode 100644 index 0000000000..bfb5ef6099 --- /dev/null +++ b/src/request_replicator.rs @@ -0,0 +1,53 @@ +//! The `request_replicator` is part of `replicator_stage` which replicates transactions broadcast +//! by the leader. + +use bank::Bank; +use banking_stage::BankingStage; +use crdt::{Crdt, ReplicatedData}; +use hash::Hash; +use ledger; +use packet; +use record_stage::RecordStage; +use result::Result; +use sig_verify_stage::SigVerifyStage; +use std::net::UdpSocket; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{spawn, JoinHandle}; +use std::time::Duration; +use streamer; +use write_stage::WriteStage; + +pub struct RequestReplicator { + bank: Arc, +} + +impl Tvu { + /// Create a new Tvu that wraps the given Bank. + pub fn new(bank: Bank) -> Self { + RequestReplicator { + bank: Arc::new(bank), + } + } + + /// Process verified blobs, already in order + pub fn replicate_requests( + &self, + verified_receiver: &streamer::BlobReceiver, + blob_recycler: &packet::BlobRecycler, + ) -> Result<()> { + let timer = Duration::new(1, 0); + let blobs = verified_receiver.recv_timeout(timer)?; + let entries = ledger::reconstruct_entries_from_blobs(&blobs); + let res = self.bank.process_verified_entries(entries); + if res.is_err() { + error!("process_verified_entries {} {:?}", blobs.len(), res); + } + res?; + for blob in blobs { + blob_recycler.recycle(blob); + } + Ok(()) + } +}